| //! Compressed/slow/row-oriented storage for documents. |
| //! |
| //! A field needs to be marked as stored in the schema in |
| //! order to be handled in the `Store`. |
| //! |
| //! Internally, documents (or rather their stored fields) are serialized to a buffer. |
| //! When the buffer exceeds 16K, the buffer is compressed using `brotli`, `LZ4` or `snappy` |
| //! and the resulting block is written to disk. |
| //! |
| //! One can then request for a specific `DocId`. |
| //! A skip list helps navigating to the right block, |
| //! decompresses it entirely and returns the document within it. |
| //! |
| //! If the last document requested was in the same block, |
| //! the reader is smart enough to avoid decompressing |
| //! the block a second time, but their is no real |
| //! uncompressed block* cache. |
| //! |
| //! A typical use case for the store is, once |
| //! the search result page has been computed, returning |
| //! the actual content of the 10 best document. |
| //! |
| //! # Usage |
| //! |
| //! Most users should not access the `StoreReader` directly |
| //! and should rely on either |
| //! |
| //! - at the segment level, the |
| //! [`SegmentReader`'s `doc` method](../struct.SegmentReader.html#method.doc) |
| //! - at the index level, the [`Searcher::doc()`](crate::Searcher::doc) method |
| //! |
| //! ! |
| |
| mod compressors; |
| mod decompressors; |
| mod footer; |
| mod index; |
| mod reader; |
| mod writer; |
| pub use self::compressors::{Compressor, ZstdCompressor}; |
| pub use self::decompressors::Decompressor; |
| pub(crate) use self::reader::DOCSTORE_CACHE_CAPACITY; |
| pub use self::reader::{CacheStats, StoreReader}; |
| pub use self::writer::StoreWriter; |
| mod store_compressor; |
| |
| /// Doc store version in footer to handle format changes. |
| pub(crate) const DOC_STORE_VERSION: u32 = 1; |
| |
| #[cfg(feature = "lz4-compression")] |
| mod compression_lz4_block; |
| |
| #[cfg(feature = "brotli-compression")] |
| mod compression_brotli; |
| |
| #[cfg(feature = "snappy-compression")] |
| mod compression_snap; |
| |
| #[cfg(feature = "zstd-compression")] |
| mod compression_zstd_block; |
| |
| #[cfg(test)] |
| pub mod tests { |
| |
| use std::path::Path; |
| |
| use super::*; |
| use crate::directory::{Directory, RamDirectory, WritePtr}; |
| use crate::fastfield::AliveBitSet; |
| use crate::schema::{self, Document, Schema, TextFieldIndexing, TextOptions, STORED, TEXT}; |
| use crate::{Index, Term}; |
| |
| const LOREM: &str = "Doc Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do \ |
| eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad \ |
| minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip \ |
| ex ea commodo consequat. Duis aute irure dolor in reprehenderit in \ |
| voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur \ |
| sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt \ |
| mollit anim id est laborum."; |
| |
| const BLOCK_SIZE: usize = 16_384; |
| |
| pub fn write_lorem_ipsum_store( |
| writer: WritePtr, |
| num_docs: usize, |
| compressor: Compressor, |
| blocksize: usize, |
| separate_thread: bool, |
| ) -> Schema { |
| let mut schema_builder = Schema::builder(); |
| let field_body = schema_builder.add_text_field("body", TextOptions::default().set_stored()); |
| let field_title = |
| schema_builder.add_text_field("title", TextOptions::default().set_stored()); |
| let schema = schema_builder.build(); |
| { |
| let mut store_writer = |
| StoreWriter::new(writer, compressor, blocksize, separate_thread).unwrap(); |
| for i in 0..num_docs { |
| let mut doc = Document::default(); |
| doc.add_field_value(field_body, LOREM.to_string()); |
| doc.add_field_value(field_title, format!("Doc {i}")); |
| store_writer.store(&doc, &schema).unwrap(); |
| } |
| store_writer.close().unwrap(); |
| } |
| schema |
| } |
| |
| const NUM_DOCS: usize = 1_000; |
| #[test] |
| fn test_doc_store_iter_with_delete_bug_1077() -> crate::Result<()> { |
| // this will cover deletion of the first element in a checkpoint |
| let deleted_doc_ids = (200..300).collect::<Vec<_>>(); |
| let alive_bitset = |
| AliveBitSet::for_test_from_deleted_docs(&deleted_doc_ids, NUM_DOCS as u32); |
| |
| let path = Path::new("store"); |
| let directory = RamDirectory::create(); |
| let store_wrt = directory.open_write(path)?; |
| let schema = |
| write_lorem_ipsum_store(store_wrt, NUM_DOCS, Compressor::Lz4, BLOCK_SIZE, true); |
| let field_title = schema.get_field("title").unwrap(); |
| let store_file = directory.open_read(path)?; |
| let store = StoreReader::open(store_file, 10)?; |
| for i in 0..NUM_DOCS as u32 { |
| assert_eq!( |
| *store |
| .get(i)? |
| .get_first(field_title) |
| .unwrap() |
| .as_text() |
| .unwrap(), |
| format!("Doc {}", i) |
| ); |
| } |
| |
| for (_, doc) in store.iter(Some(&alive_bitset)).enumerate() { |
| let doc = doc?; |
| let title_content = doc.get_first(field_title).unwrap().as_text().unwrap(); |
| if !title_content.starts_with("Doc ") { |
| panic!("unexpected title_content {}", title_content); |
| } |
| |
| let id = title_content |
| .strip_prefix("Doc ") |
| .unwrap() |
| .parse::<u32>() |
| .unwrap(); |
| if alive_bitset.is_deleted(id) { |
| panic!("unexpected deleted document {}", id); |
| } |
| } |
| |
| Ok(()) |
| } |
| |
| fn test_store( |
| compressor: Compressor, |
| blocksize: usize, |
| separate_thread: bool, |
| ) -> crate::Result<()> { |
| let path = Path::new("store"); |
| let directory = RamDirectory::create(); |
| let store_wrt = directory.open_write(path)?; |
| let schema = |
| write_lorem_ipsum_store(store_wrt, NUM_DOCS, compressor, blocksize, separate_thread); |
| let field_title = schema.get_field("title").unwrap(); |
| let store_file = directory.open_read(path)?; |
| let store = StoreReader::open(store_file, 10)?; |
| for i in 0..NUM_DOCS as u32 { |
| assert_eq!( |
| *store |
| .get(i)? |
| .get_first(field_title) |
| .unwrap() |
| .as_text() |
| .unwrap(), |
| format!("Doc {}", i) |
| ); |
| } |
| for (i, doc) in store.iter(None).enumerate() { |
| assert_eq!( |
| *doc?.get_first(field_title).unwrap().as_text().unwrap(), |
| format!("Doc {}", i) |
| ); |
| } |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_store_no_compression_same_thread() -> crate::Result<()> { |
| test_store(Compressor::None, BLOCK_SIZE, false) |
| } |
| |
| #[test] |
| fn test_store_no_compression() -> crate::Result<()> { |
| test_store(Compressor::None, BLOCK_SIZE, true) |
| } |
| |
| #[cfg(feature = "lz4-compression")] |
| #[test] |
| fn test_store_lz4_block() -> crate::Result<()> { |
| test_store(Compressor::Lz4, BLOCK_SIZE, true) |
| } |
| #[cfg(feature = "snappy-compression")] |
| #[test] |
| fn test_store_snap() -> crate::Result<()> { |
| test_store(Compressor::Snappy, BLOCK_SIZE, true) |
| } |
| #[cfg(feature = "brotli-compression")] |
| #[test] |
| fn test_store_brotli() -> crate::Result<()> { |
| test_store(Compressor::Brotli, BLOCK_SIZE, true) |
| } |
| |
| #[cfg(feature = "zstd-compression")] |
| #[test] |
| fn test_store_zstd() -> crate::Result<()> { |
| test_store( |
| Compressor::Zstd(ZstdCompressor::default()), |
| BLOCK_SIZE, |
| true, |
| ) |
| } |
| |
| #[test] |
| fn test_store_with_delete() -> crate::Result<()> { |
| let mut schema_builder = schema::Schema::builder(); |
| |
| let text_field_options = TextOptions::default() |
| .set_indexing_options( |
| TextFieldIndexing::default() |
| .set_index_option(schema::IndexRecordOption::WithFreqsAndPositions), |
| ) |
| .set_stored(); |
| let text_field = schema_builder.add_text_field("text_field", text_field_options); |
| let schema = schema_builder.build(); |
| let index_builder = Index::builder().schema(schema); |
| |
| let index = index_builder.create_in_ram()?; |
| |
| { |
| let mut index_writer = index.writer_for_tests().unwrap(); |
| index_writer.add_document(doc!(text_field=> "deleteme"))?; |
| index_writer.add_document(doc!(text_field=> "deletemenot"))?; |
| index_writer.add_document(doc!(text_field=> "deleteme"))?; |
| index_writer.add_document(doc!(text_field=> "deletemenot"))?; |
| index_writer.add_document(doc!(text_field=> "deleteme"))?; |
| |
| index_writer.delete_term(Term::from_field_text(text_field, "deleteme")); |
| index_writer.commit()?; |
| } |
| |
| let searcher = index.reader()?.searcher(); |
| let reader = searcher.segment_reader(0); |
| let store = reader.get_store_reader(10)?; |
| for doc in store.iter(reader.alive_bitset()) { |
| assert_eq!( |
| *doc?.get_first(text_field).unwrap().as_text().unwrap(), |
| "deletemenot".to_string() |
| ); |
| } |
| Ok(()) |
| } |
| |
| #[cfg(feature = "snappy-compression")] |
| #[cfg(feature = "lz4-compression")] |
| #[test] |
| fn test_merge_with_changed_compressor() -> crate::Result<()> { |
| let mut schema_builder = schema::Schema::builder(); |
| |
| let text_field = schema_builder.add_text_field("text_field", TEXT | STORED); |
| let schema = schema_builder.build(); |
| let index_builder = Index::builder().schema(schema); |
| |
| let mut index = index_builder.create_in_ram().unwrap(); |
| index.settings_mut().docstore_compression = Compressor::Lz4; |
| { |
| let mut index_writer = index.writer_for_tests().unwrap(); |
| // put enough data create enough blocks in the doc store to be considered for stacking |
| for _ in 0..200 { |
| index_writer.add_document(doc!(text_field=> LOREM))?; |
| } |
| assert!(index_writer.commit().is_ok()); |
| for _ in 0..200 { |
| index_writer.add_document(doc!(text_field=> LOREM))?; |
| } |
| assert!(index_writer.commit().is_ok()); |
| } |
| assert_eq!( |
| index.reader().unwrap().searcher().segment_readers()[0] |
| .get_store_reader(10) |
| .unwrap() |
| .decompressor(), |
| Decompressor::Lz4 |
| ); |
| // Change compressor, this disables stacking on merging |
| let index_settings = index.settings_mut(); |
| index_settings.docstore_compression = Compressor::Snappy; |
| // Merging the segments |
| { |
| let segment_ids = index |
| .searchable_segment_ids() |
| .expect("Searchable segments failed."); |
| let mut index_writer = index.writer_for_tests().unwrap(); |
| assert!(index_writer.merge(&segment_ids).wait().is_ok()); |
| assert!(index_writer.wait_merging_threads().is_ok()); |
| } |
| |
| let searcher = index.reader().unwrap().searcher(); |
| assert_eq!(searcher.segment_readers().len(), 1); |
| let reader = searcher.segment_readers().iter().last().unwrap(); |
| let store = reader.get_store_reader(10).unwrap(); |
| |
| for doc in store.iter(reader.alive_bitset()).take(50) { |
| assert_eq!( |
| *doc?.get_first(text_field).unwrap().as_text().unwrap(), |
| LOREM.to_string() |
| ); |
| } |
| assert_eq!(store.decompressor(), Decompressor::Snappy); |
| |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_merge_of_small_segments() -> crate::Result<()> { |
| let mut schema_builder = schema::Schema::builder(); |
| |
| let text_field = schema_builder.add_text_field("text_field", TEXT | STORED); |
| let schema = schema_builder.build(); |
| let index_builder = Index::builder().schema(schema); |
| |
| let index = index_builder.create_in_ram().unwrap(); |
| |
| { |
| let mut index_writer = index.writer_for_tests()?; |
| index_writer.add_document(doc!(text_field=> "1"))?; |
| index_writer.commit()?; |
| index_writer.add_document(doc!(text_field=> "2"))?; |
| index_writer.commit()?; |
| index_writer.add_document(doc!(text_field=> "3"))?; |
| index_writer.commit()?; |
| index_writer.add_document(doc!(text_field=> "4"))?; |
| index_writer.commit()?; |
| index_writer.add_document(doc!(text_field=> "5"))?; |
| index_writer.commit()?; |
| } |
| // Merging the segments |
| { |
| let segment_ids = index.searchable_segment_ids()?; |
| let mut index_writer = index.writer_for_tests()?; |
| index_writer.merge(&segment_ids).wait()?; |
| index_writer.wait_merging_threads()?; |
| } |
| |
| let searcher = index.reader()?.searcher(); |
| assert_eq!(searcher.segment_readers().len(), 1); |
| let reader = searcher.segment_readers().iter().last().unwrap(); |
| let store = reader.get_store_reader(10)?; |
| assert_eq!(store.block_checkpoints().count(), 1); |
| Ok(()) |
| } |
| } |
| |
| #[cfg(all(test, feature = "unstable"))] |
| mod bench { |
| |
| use std::path::Path; |
| |
| use test::Bencher; |
| |
| use super::tests::write_lorem_ipsum_store; |
| use crate::directory::{Directory, RamDirectory}; |
| use crate::store::{Compressor, StoreReader}; |
| |
| #[bench] |
| #[cfg(feature = "mmap")] |
| fn bench_store_encode(b: &mut Bencher) { |
| let directory = RamDirectory::create(); |
| let path = Path::new("store"); |
| b.iter(|| { |
| write_lorem_ipsum_store( |
| directory.open_write(path).unwrap(), |
| 1_000, |
| Compressor::default(), |
| 16_384, |
| true, |
| ); |
| directory.delete(path).unwrap(); |
| }); |
| } |
| |
| #[bench] |
| fn bench_store_decode(b: &mut Bencher) { |
| let directory = RamDirectory::create(); |
| let path = Path::new("store"); |
| write_lorem_ipsum_store( |
| directory.open_write(path).unwrap(), |
| 1_000, |
| Compressor::default(), |
| 16_384, |
| true, |
| ); |
| let store_file = directory.open_read(path).unwrap(); |
| let store = StoreReader::open(store_file, 10).unwrap(); |
| b.iter(|| store.iter(None).collect::<Vec<_>>()); |
| } |
| } |