blob: 6a0dd1885479e2476d82cefa984ff2dccf3eab8b [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define C_LUCY_SEGWRITER
#include "Lucy/Util/ToolSet.h"
#include "Lucy/Index/SegWriter.h"
#include "Lucy/Document/Doc.h"
#include "Lucy/Plan/Schema.h"
#include "Lucy/Store/DirHandle.h"
#include "Lucy/Store/Folder.h"
#include "Lucy/Index/DeletionsWriter.h"
#include "Lucy/Index/Inverter.h"
#include "Lucy/Index/PolyReader.h"
#include "Lucy/Index/Segment.h"
#include "Lucy/Index/SegReader.h"
#include "Lucy/Index/Snapshot.h"
#include "Lucy/Plan/Architecture.h"
SegWriter*
SegWriter_new(Schema *schema, Snapshot *snapshot, Segment *segment,
PolyReader *polyreader) {
SegWriter *self = (SegWriter*)VTable_Make_Obj(SEGWRITER);
return SegWriter_init(self, schema, snapshot, segment, polyreader);
}
SegWriter*
SegWriter_init(SegWriter *self, Schema *schema, Snapshot *snapshot,
Segment *segment, PolyReader *polyreader) {
Architecture *arch = Schema_Get_Architecture(schema);
DataWriter_init((DataWriter*)self, schema, snapshot, segment, polyreader);
self->by_api = Hash_new(0);
self->inverter = Inverter_new(schema, segment);
self->writers = VA_new(16);
Arch_Init_Seg_Writer(arch, self);
return self;
}
void
SegWriter_destroy(SegWriter *self) {
DECREF(self->inverter);
DECREF(self->writers);
DECREF(self->by_api);
DECREF(self->del_writer);
SUPER_DESTROY(self, SEGWRITER);
}
void
SegWriter_register(SegWriter *self, const CharBuf *api,
DataWriter *component) {
CERTIFY(component, DATAWRITER);
if (Hash_Fetch(self->by_api, (Obj*)api)) {
THROW(ERR, "API %o already registered", api);
}
Hash_Store(self->by_api, (Obj*)api, (Obj*)component);
}
Obj*
SegWriter_fetch(SegWriter *self, const CharBuf *api) {
return Hash_Fetch(self->by_api, (Obj*)api);
}
void
SegWriter_add_writer(SegWriter *self, DataWriter *writer) {
VA_Push(self->writers, (Obj*)writer);
}
void
SegWriter_prep_seg_dir(SegWriter *self) {
Folder *folder = SegWriter_Get_Folder(self);
CharBuf *seg_name = Seg_Get_Name(self->segment);
// Clear stale segment files from crashed indexing sessions.
if (Folder_Exists(folder, seg_name)) {
bool_t result = Folder_Delete_Tree(folder, seg_name);
if (!result) {
THROW(ERR, "Couldn't completely remove '%o'", seg_name);
}
}
// Create the segment directory.
bool_t result = Folder_MkDir(folder, seg_name);
if (!result) { RETHROW(INCREF(Err_get_error())); }
}
void
SegWriter_add_doc(SegWriter *self, Doc *doc, float boost) {
int32_t doc_id = (int32_t)Seg_Increment_Count(self->segment, 1);
Inverter_Invert_Doc(self->inverter, doc);
Inverter_Set_Boost(self->inverter, boost);
SegWriter_Add_Inverted_Doc(self, self->inverter, doc_id);
}
void
SegWriter_add_inverted_doc(SegWriter *self, Inverter *inverter,
int32_t doc_id) {
for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
DataWriter_Add_Inverted_Doc(writer, inverter, doc_id);
}
}
// Adjust current doc id. We create our own doc_count rather than rely on
// SegReader's number because the DeletionsWriter and the SegReader are
// probably out of sync.
static void
S_adjust_doc_id(SegWriter *self, SegReader *reader, I32Array *doc_map) {
uint32_t doc_count = SegReader_Doc_Max(reader);
for (uint32_t i = 1, max = I32Arr_Get_Size(doc_map); i < max; i++) {
if (I32Arr_Get(doc_map, i) == 0) { doc_count--; }
}
Seg_Increment_Count(self->segment, doc_count);
}
void
SegWriter_add_segment(SegWriter *self, SegReader *reader, I32Array *doc_map) {
// Bulk add the slab of documents to the various writers.
for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
DataWriter_Add_Segment(writer, reader, doc_map);
}
// Bulk add the segment to the DeletionsWriter, so that it can merge
// previous segment files as necessary.
DelWriter_Add_Segment(self->del_writer, reader, doc_map);
// Adust the document id.
S_adjust_doc_id(self, reader, doc_map);
}
void
SegWriter_merge_segment(SegWriter *self, SegReader *reader,
I32Array *doc_map) {
Snapshot *snapshot = SegWriter_Get_Snapshot(self);
CharBuf *seg_name = Seg_Get_Name(SegReader_Get_Segment(reader));
// Have all the sub-writers merge the segment.
for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
DataWriter_Merge_Segment(writer, reader, doc_map);
}
DelWriter_Merge_Segment(self->del_writer, reader, doc_map);
// Remove seg directory from snapshot.
Snapshot_Delete_Entry(snapshot, seg_name);
// Adust the document id.
S_adjust_doc_id(self, reader, doc_map);
}
void
SegWriter_delete_segment(SegWriter *self, SegReader *reader) {
Snapshot *snapshot = SegWriter_Get_Snapshot(self);
CharBuf *seg_name = Seg_Get_Name(SegReader_Get_Segment(reader));
// Have all the sub-writers delete the segment.
for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
DataWriter_Delete_Segment(writer, reader);
}
DelWriter_Delete_Segment(self->del_writer, reader);
// Remove seg directory from snapshot.
Snapshot_Delete_Entry(snapshot, seg_name);
}
void
SegWriter_finish(SegWriter *self) {
CharBuf *seg_name = Seg_Get_Name(self->segment);
// Finish off children.
for (uint32_t i = 0, max = VA_Get_Size(self->writers); i < max; i++) {
DataWriter *writer = (DataWriter*)VA_Fetch(self->writers, i);
DataWriter_Finish(writer);
}
// Write segment metadata and add the segment directory to the snapshot.
Snapshot *snapshot = SegWriter_Get_Snapshot(self);
CharBuf *segmeta_filename = CB_newf("%o/segmeta.json", seg_name);
Seg_Write_File(self->segment, self->folder);
Snapshot_Add_Entry(snapshot, seg_name);
DECREF(segmeta_filename);
// Collapse segment files into compound file.
Folder_Consolidate(self->folder, seg_name);
}
void
SegWriter_add_data_writer(SegWriter *self, DataWriter *writer) {
VA_Push(self->writers, (Obj*)writer);
}
void
SegWriter_set_del_writer(SegWriter *self, DeletionsWriter *del_writer) {
DECREF(self->del_writer);
self->del_writer = (DeletionsWriter*)INCREF(del_writer);
}
DeletionsWriter*
SegWriter_get_del_writer(SegWriter *self) {
return self->del_writer;
}