| /* Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #define C_LUCY_BACKGROUNDMERGER |
| #include "Lucy/Util/ToolSet.h" |
| |
| #include "Lucy/Index/BackgroundMerger.h" |
| #include "Lucy/Index/DeletionsReader.h" |
| #include "Lucy/Index/DeletionsWriter.h" |
| #include "Lucy/Index/FilePurger.h" |
| #include "Lucy/Index/IndexManager.h" |
| #include "Lucy/Index/PolyReader.h" |
| #include "Lucy/Index/Segment.h" |
| #include "Lucy/Index/SegReader.h" |
| #include "Lucy/Index/Snapshot.h" |
| #include "Lucy/Index/SegWriter.h" |
| #include "Lucy/Plan/Architecture.h" |
| #include "Lucy/Plan/Schema.h" |
| #include "Lucy/Search/Matcher.h" |
| #include "Lucy/Store/Folder.h" |
| #include "Lucy/Store/FSFolder.h" |
| #include "Lucy/Store/Lock.h" |
| #include "Lucy/Util/IndexFileNames.h" |
| #include "Lucy/Util/Json.h" |
| |
| // Verify a Folder or derive an FSFolder from a CharBuf path. |
| static Folder* |
| S_init_folder(Obj *index); |
| |
| // Grab the write lock and store it in self. |
| static void |
| S_obtain_write_lock(BackgroundMerger *self); |
| |
| // Grab the merge lock and store it in self. |
| static void |
| S_obtain_merge_lock(BackgroundMerger *self); |
| |
| // Release the write lock - if it's there. |
| static void |
| S_release_write_lock(BackgroundMerger *self); |
| |
| // Release the merge lock - if it's there. |
| static void |
| S_release_merge_lock(BackgroundMerger *self); |
| |
| BackgroundMerger* |
| BGMerger_new(Obj *index, IndexManager *manager) { |
| BackgroundMerger *self |
| = (BackgroundMerger*)VTable_Make_Obj(BACKGROUNDMERGER); |
| return BGMerger_init(self, index, manager); |
| } |
| |
| BackgroundMerger* |
| BGMerger_init(BackgroundMerger *self, Obj *index, IndexManager *manager) { |
| Folder *folder = S_init_folder(index); |
| |
| // Init. |
| self->optimize = false; |
| self->prepared = false; |
| self->needs_commit = false; |
| self->snapfile = NULL; |
| self->doc_maps = Hash_new(0); |
| |
| // Assign. |
| self->folder = folder; |
| if (manager) { |
| self->manager = (IndexManager*)INCREF(manager); |
| } |
| else { |
| self->manager = IxManager_new(NULL, NULL); |
| IxManager_Set_Write_Lock_Timeout(self->manager, 10000); |
| } |
| IxManager_Set_Folder(self->manager, folder); |
| |
| // Obtain write lock (which we'll only hold briefly), then merge lock. |
| S_obtain_write_lock(self); |
| if (!self->write_lock) { |
| DECREF(self); |
| RETHROW(INCREF(Err_get_error())); |
| } |
| S_obtain_merge_lock(self); |
| if (!self->merge_lock) { |
| DECREF(self); |
| RETHROW(INCREF(Err_get_error())); |
| } |
| |
| // Find the latest snapshot. If there's no index content, bail early. |
| self->snapshot = Snapshot_Read_File(Snapshot_new(), folder, NULL); |
| if (!Snapshot_Get_Path(self->snapshot)) { |
| S_release_write_lock(self); |
| S_release_merge_lock(self); |
| return self; |
| } |
| |
| // Create FilePurger. Zap detritus from previous sessions. |
| self->file_purger = FilePurger_new(folder, self->snapshot, self->manager); |
| FilePurger_Purge(self->file_purger); |
| |
| // Open a PolyReader, passing in the IndexManager so we get a read lock on |
| // the Snapshot's files -- so that Indexers don't zap our files while |
| // we're operating in the background. |
| self->polyreader = PolyReader_open((Obj*)folder, NULL, self->manager); |
| |
| // Clone the PolyReader's schema. |
| { |
| Hash *dump = Schema_Dump(PolyReader_Get_Schema(self->polyreader)); |
| self->schema = (Schema*)CERTIFY(VTable_Load_Obj(SCHEMA, (Obj*)dump), |
| SCHEMA); |
| DECREF(dump); |
| } |
| |
| // Create new Segment. |
| { |
| int64_t new_seg_num |
| = IxManager_Highest_Seg_Num(self->manager, self->snapshot) + 1; |
| VArray *fields = Schema_All_Fields(self->schema); |
| uint32_t i, max; |
| self->segment = Seg_new(new_seg_num); |
| for (i = 0, max = VA_Get_Size(fields); i < max; i++) { |
| Seg_Add_Field(self->segment, (CharBuf*)VA_Fetch(fields, i)); |
| } |
| DECREF(fields); |
| } |
| |
| // Our "cutoff" is the segment this BackgroundMerger will write. Now that |
| // we've determined the cutoff, write the merge data file. |
| self->cutoff = Seg_Get_Number(self->segment); |
| IxManager_Write_Merge_Data(self->manager, self->cutoff); |
| |
| /* Create the SegWriter but hold off on preparing the new segment |
| * directory -- because if we don't need to merge any segments we don't |
| * need it. (We've reserved the dir by plopping down the merge.json |
| * file.) */ |
| self->seg_writer = SegWriter_new(self->schema, self->snapshot, |
| self->segment, self->polyreader); |
| |
| // Grab a local ref to the DeletionsWriter. |
| self->del_writer |
| = (DeletionsWriter*)INCREF(SegWriter_Get_Del_Writer(self->seg_writer)); |
| |
| // Release the write lock. Now new Indexers can start while we work in |
| // the background. |
| S_release_write_lock(self); |
| |
| return self; |
| } |
| |
| void |
| BGMerger_destroy(BackgroundMerger *self) { |
| S_release_merge_lock(self); |
| S_release_write_lock(self); |
| DECREF(self->schema); |
| DECREF(self->folder); |
| DECREF(self->segment); |
| DECREF(self->manager); |
| DECREF(self->polyreader); |
| DECREF(self->del_writer); |
| DECREF(self->snapshot); |
| DECREF(self->seg_writer); |
| DECREF(self->file_purger); |
| DECREF(self->write_lock); |
| DECREF(self->snapfile); |
| DECREF(self->doc_maps); |
| SUPER_DESTROY(self, BACKGROUNDMERGER); |
| } |
| |
| static Folder* |
| S_init_folder(Obj *index) { |
| Folder *folder = NULL; |
| |
| // Validate or acquire a Folder. |
| if (Obj_Is_A(index, FOLDER)) { |
| folder = (Folder*)INCREF(index); |
| } |
| else if (Obj_Is_A(index, CHARBUF)) { |
| folder = (Folder*)FSFolder_new((CharBuf*)index); |
| } |
| else { |
| THROW(ERR, "Invalid type for 'index': %o", Obj_Get_Class_Name(index)); |
| } |
| |
| // Validate index directory. |
| if (!Folder_Check(folder)) { |
| THROW(ERR, "Folder '%o' failed check", Folder_Get_Path(folder)); |
| } |
| |
| return folder; |
| } |
| |
| void |
| BGMerger_optimize(BackgroundMerger *self) { |
| self->optimize = true; |
| } |
| |
| static uint32_t |
| S_maybe_merge(BackgroundMerger *self) { |
| VArray *to_merge = IxManager_Recycle(self->manager, self->polyreader, |
| self->del_writer, 0, self->optimize); |
| int32_t num_to_merge = VA_Get_Size(to_merge); |
| uint32_t i, max; |
| |
| // There's no point in merging one segment if it has no deletions, because |
| // we'd just be rewriting it. */ |
| if (num_to_merge == 1) { |
| SegReader *seg_reader = (SegReader*)VA_Fetch(to_merge, 0); |
| if (!SegReader_Del_Count(seg_reader)) { |
| DECREF(to_merge); |
| return 0; |
| } |
| } |
| else if (num_to_merge == 0) { |
| DECREF(to_merge); |
| return 0; |
| } |
| |
| // Now that we're sure we're writing a new segment, prep the seg dir. |
| SegWriter_Prep_Seg_Dir(self->seg_writer); |
| |
| // Consolidate segments. |
| for (i = 0, max = num_to_merge; i < max; i++) { |
| SegReader *seg_reader = (SegReader*)VA_Fetch(to_merge, i); |
| CharBuf *seg_name = SegReader_Get_Seg_Name(seg_reader); |
| int64_t doc_count = Seg_Get_Count(self->segment); |
| Matcher *deletions |
| = DelWriter_Seg_Deletions(self->del_writer, seg_reader); |
| I32Array *doc_map = DelWriter_Generate_Doc_Map( |
| self->del_writer, deletions, |
| SegReader_Doc_Max(seg_reader), |
| (int32_t)doc_count); |
| |
| Hash_Store(self->doc_maps, (Obj*)seg_name, (Obj*)doc_map); |
| SegWriter_Merge_Segment(self->seg_writer, seg_reader, doc_map); |
| DECREF(deletions); |
| } |
| |
| DECREF(to_merge); |
| return num_to_merge; |
| } |
| |
| static bool_t |
| S_merge_updated_deletions(BackgroundMerger *self) { |
| Hash *updated_deletions = NULL; |
| |
| { |
| PolyReader *new_polyreader |
| = PolyReader_open((Obj*)self->folder, NULL, NULL); |
| VArray *new_seg_readers |
| = PolyReader_Get_Seg_Readers(new_polyreader); |
| VArray *old_seg_readers |
| = PolyReader_Get_Seg_Readers(self->polyreader); |
| Hash *new_segs = Hash_new(VA_Get_Size(new_seg_readers)); |
| uint32_t i, max; |
| |
| for (i = 0, max = VA_Get_Size(new_seg_readers); i < max; i++) { |
| SegReader *seg_reader = (SegReader*)VA_Fetch(new_seg_readers, i); |
| CharBuf *seg_name = SegReader_Get_Seg_Name(seg_reader); |
| Hash_Store(new_segs, (Obj*)seg_name, INCREF(seg_reader)); |
| } |
| |
| for (i = 0, max = VA_Get_Size(old_seg_readers); i < max; i++) { |
| SegReader *seg_reader = (SegReader*)VA_Fetch(old_seg_readers, i); |
| CharBuf *seg_name = SegReader_Get_Seg_Name(seg_reader); |
| |
| // If this segment was merged away... |
| if (Hash_Fetch(self->doc_maps, (Obj*)seg_name)) { |
| SegReader *new_seg_reader |
| = (SegReader*)CERTIFY( |
| Hash_Fetch(new_segs, (Obj*)seg_name), |
| SEGREADER); |
| int32_t old_del_count = SegReader_Del_Count(seg_reader); |
| int32_t new_del_count = SegReader_Del_Count(new_seg_reader); |
| // ... were any new deletions applied against it? |
| if (old_del_count != new_del_count) { |
| DeletionsReader *del_reader |
| = (DeletionsReader*)SegReader_Obtain( |
| new_seg_reader, |
| VTable_Get_Name(DELETIONSREADER)); |
| if (!updated_deletions) { |
| updated_deletions = Hash_new(max); |
| } |
| Hash_Store(updated_deletions, (Obj*)seg_name, |
| (Obj*)DelReader_Iterator(del_reader)); |
| } |
| } |
| } |
| |
| DECREF(new_polyreader); |
| DECREF(new_segs); |
| } |
| |
| if (!updated_deletions) { |
| return false; |
| } |
| else { |
| PolyReader *merge_polyreader |
| = PolyReader_open((Obj*)self->folder, self->snapshot, NULL); |
| VArray *merge_seg_readers |
| = PolyReader_Get_Seg_Readers(merge_polyreader); |
| Snapshot *latest_snapshot |
| = Snapshot_Read_File(Snapshot_new(), self->folder, NULL); |
| int64_t new_seg_num |
| = IxManager_Highest_Seg_Num(self->manager, latest_snapshot) + 1; |
| Segment *new_segment = Seg_new(new_seg_num); |
| SegWriter *seg_writer = SegWriter_new(self->schema, self->snapshot, |
| new_segment, merge_polyreader); |
| DeletionsWriter *del_writer = SegWriter_Get_Del_Writer(seg_writer); |
| int64_t merge_seg_num = Seg_Get_Number(self->segment); |
| uint32_t seg_tick = I32_MAX; |
| int32_t offset = I32_MAX; |
| CharBuf *seg_name = NULL; |
| Matcher *deletions = NULL; |
| uint32_t i, max; |
| |
| SegWriter_Prep_Seg_Dir(seg_writer); |
| |
| for (i = 0, max = VA_Get_Size(merge_seg_readers); i < max; i++) { |
| SegReader *seg_reader |
| = (SegReader*)VA_Fetch(merge_seg_readers, i); |
| if (SegReader_Get_Seg_Num(seg_reader) == merge_seg_num) { |
| I32Array *offsets = PolyReader_Offsets(merge_polyreader); |
| seg_tick = i; |
| offset = I32Arr_Get(offsets, seg_tick); |
| DECREF(offsets); |
| } |
| } |
| if (offset == I32_MAX) { THROW(ERR, "Failed sanity check"); } |
| |
| Hash_Iterate(updated_deletions); |
| while (Hash_Next(updated_deletions, |
| (Obj**)&seg_name, (Obj**)&deletions) |
| ) { |
| I32Array *doc_map |
| = (I32Array*)CERTIFY( |
| Hash_Fetch(self->doc_maps, (Obj*)seg_name), |
| I32ARRAY); |
| int32_t del; |
| while (0 != (del = Matcher_Next(deletions))) { |
| // Find the slot where the deleted doc resides in the |
| // rewritten segment. If the doc was already deleted when we |
| // were merging, do nothing. |
| int32_t remapped = I32Arr_Get(doc_map, del); |
| if (remapped) { |
| // It's a new deletion, so carry it forward and zap it in |
| // the rewritten segment. |
| DelWriter_Delete_By_Doc_ID(del_writer, remapped + offset); |
| } |
| } |
| } |
| |
| // Finish the segment and clean up. |
| DelWriter_Finish(del_writer); |
| SegWriter_Finish(seg_writer); |
| DECREF(seg_writer); |
| DECREF(new_segment); |
| DECREF(latest_snapshot); |
| DECREF(merge_polyreader); |
| DECREF(updated_deletions); |
| } |
| |
| return true; |
| } |
| |
| void |
| BGMerger_prepare_commit(BackgroundMerger *self) { |
| VArray *seg_readers = PolyReader_Get_Seg_Readers(self->polyreader); |
| uint32_t num_seg_readers = VA_Get_Size(seg_readers); |
| uint32_t segs_merged = 0; |
| |
| if (self->prepared) { |
| THROW(ERR, "Can't call Prepare_Commit() more than once"); |
| } |
| |
| // Maybe merge existing index data. |
| if (num_seg_readers) { |
| segs_merged = S_maybe_merge(self); |
| } |
| |
| if (!segs_merged) { |
| // Nothing merged. Leave self->needs_commit false and bail out. |
| self->prepared = true; |
| return; |
| } |
| // Finish the segment and write a new snapshot file. |
| else { |
| Folder *folder = self->folder; |
| Snapshot *snapshot = self->snapshot; |
| |
| // Write out new deletions. |
| if (DelWriter_Updated(self->del_writer)) { |
| // Only write out if they haven't all been applied. |
| if (segs_merged != num_seg_readers) { |
| DelWriter_Finish(self->del_writer); |
| } |
| } |
| |
| // Finish the segment. |
| SegWriter_Finish(self->seg_writer); |
| |
| // Grab the write lock. |
| S_obtain_write_lock(self); |
| if (!self->write_lock) { |
| RETHROW(INCREF(Err_get_error())); |
| } |
| |
| // Write temporary snapshot file. |
| DECREF(self->snapfile); |
| self->snapfile = IxManager_Make_Snapshot_Filename(self->manager); |
| CB_Cat_Trusted_Str(self->snapfile, ".temp", 5); |
| Folder_Delete(folder, self->snapfile); |
| Snapshot_Write_File(snapshot, folder, self->snapfile); |
| |
| // Determine whether the index has been updated while this background |
| // merge process was running. |
| { |
| CharBuf *start_snapfile |
| = Snapshot_Get_Path(PolyReader_Get_Snapshot(self->polyreader)); |
| Snapshot *latest_snapshot |
| = Snapshot_Read_File(Snapshot_new(), self->folder, NULL); |
| CharBuf *latest_snapfile = Snapshot_Get_Path(latest_snapshot); |
| bool_t index_updated |
| = !CB_Equals(start_snapfile, (Obj*)latest_snapfile); |
| |
| if (index_updated) { |
| /* See if new deletions have been applied since this |
| * background merge process started against any of the |
| * segments we just merged away. If that's true, we need to |
| * write another segment which applies the deletions against |
| * the new composite segment. |
| */ |
| S_merge_updated_deletions(self); |
| |
| // Add the fresh content to our snapshot. (It's important to |
| // run this AFTER S_merge_updated_deletions, because otherwise |
| // we couldn't tell whether the deletion counts changed.) |
| { |
| VArray *files = Snapshot_List(latest_snapshot); |
| uint32_t i, max; |
| for (i = 0, max = VA_Get_Size(files); i < max; i++) { |
| CharBuf *file = (CharBuf*)VA_Fetch(files, i); |
| if (CB_Starts_With_Str(file, "seg_", 4)) { |
| int64_t gen = (int64_t)IxFileNames_extract_gen(file); |
| if (gen > self->cutoff) { |
| Snapshot_Add_Entry(self->snapshot, file); |
| } |
| } |
| } |
| DECREF(files); |
| } |
| |
| // Since the snapshot content has changed, we need to rewrite |
| // it. |
| Folder_Delete(folder, self->snapfile); |
| Snapshot_Write_File(snapshot, folder, self->snapfile); |
| } |
| |
| DECREF(latest_snapshot); |
| } |
| |
| self->needs_commit = true; |
| } |
| |
| // Close reader, so that we can delete its files if appropriate. |
| PolyReader_Close(self->polyreader); |
| |
| self->prepared = true; |
| } |
| |
| void |
| BGMerger_commit(BackgroundMerger *self) { |
| // Safety check. |
| if (!self->merge_lock) { |
| THROW(ERR, "Can't call commit() more than once"); |
| } |
| |
| if (!self->prepared) { |
| BGMerger_Prepare_Commit(self); |
| } |
| |
| if (self->needs_commit) { |
| bool_t success = false; |
| CharBuf *temp_snapfile = CB_Clone(self->snapfile); |
| |
| // Rename temp snapshot file. |
| CB_Chop(self->snapfile, sizeof(".temp") - 1); |
| success = Folder_Hard_Link(self->folder, temp_snapfile, |
| self->snapfile); |
| Snapshot_Set_Path(self->snapshot, self->snapfile); |
| if (!success) { |
| CharBuf *mess = CB_newf("Can't create hard link from %o to %o", |
| temp_snapfile, self->snapfile); |
| DECREF(temp_snapfile); |
| Err_throw_mess(ERR, mess); |
| } |
| if (!Folder_Delete(self->folder, temp_snapfile)) { |
| CharBuf *mess = CB_newf("Can't delete %o", temp_snapfile); |
| DECREF(temp_snapfile); |
| Err_throw_mess(ERR, mess); |
| } |
| DECREF(temp_snapfile); |
| } |
| |
| // Release the merge lock and remove the merge data file. |
| S_release_merge_lock(self); |
| IxManager_Remove_Merge_Data(self->manager); |
| |
| if (self->needs_commit) { |
| // Purge obsolete files. |
| FilePurger_Purge(self->file_purger); |
| } |
| |
| // Release the write lock. |
| S_release_write_lock(self); |
| } |
| |
| static void |
| S_obtain_write_lock(BackgroundMerger *self) { |
| Lock *write_lock = IxManager_Make_Write_Lock(self->manager); |
| Lock_Clear_Stale(write_lock); |
| if (Lock_Obtain(write_lock)) { |
| // Only assign if successful, otherwise DESTROY unlocks -- bad! |
| self->write_lock = write_lock; |
| } |
| else { |
| DECREF(write_lock); |
| } |
| } |
| |
| static void |
| S_obtain_merge_lock(BackgroundMerger *self) { |
| Lock *merge_lock = IxManager_Make_Merge_Lock(self->manager); |
| Lock_Clear_Stale(merge_lock); |
| if (Lock_Obtain(merge_lock)) { |
| // Only assign if successful, same rationale as above. |
| self->merge_lock = merge_lock; |
| } |
| else { |
| // We can't get the merge lock, so it seems there must be another |
| // BackgroundMerger running. |
| DECREF(merge_lock); |
| } |
| } |
| |
| static void |
| S_release_write_lock(BackgroundMerger *self) { |
| if (self->write_lock) { |
| Lock_Release(self->write_lock); |
| DECREF(self->write_lock); |
| self->write_lock = NULL; |
| } |
| } |
| |
| static void |
| S_release_merge_lock(BackgroundMerger *self) { |
| if (self->merge_lock) { |
| Lock_Release(self->merge_lock); |
| DECREF(self->merge_lock); |
| self->merge_lock = NULL; |
| } |
| } |
| |
| |