blob: 934969bd23c6c5b9cff774f4fadd73ebbc277739 [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define C_LUCY_DOCREADER
#define C_LUCY_POLYDOCREADER
#define C_LUCY_DEFAULTDOCREADER
#include "Lucy/Util/ToolSet.h"
#include "Lucy/Index/DocReader.h"
#include "Lucy/Document/HitDoc.h"
#include "Lucy/Index/DocWriter.h"
#include "Lucy/Index/PolyReader.h"
#include "Lucy/Index/Segment.h"
#include "Lucy/Index/Snapshot.h"
#include "Lucy/Plan/Schema.h"
#include "Lucy/Store/Folder.h"
#include "Lucy/Store/InStream.h"
DocReader*
DocReader_init(DocReader *self, Schema *schema, Folder *folder,
Snapshot *snapshot, VArray *segments, int32_t seg_tick) {
return (DocReader*)DataReader_init((DataReader*)self, schema, folder,
snapshot, segments, seg_tick);
}
DocReader*
DocReader_aggregator(DocReader *self, VArray *readers, I32Array *offsets) {
UNUSED_VAR(self);
return (DocReader*)PolyDocReader_new(readers, offsets);
}
PolyDocReader*
PolyDocReader_new(VArray *readers, I32Array *offsets) {
PolyDocReader *self = (PolyDocReader*)VTable_Make_Obj(POLYDOCREADER);
return PolyDocReader_init(self, readers, offsets);
}
PolyDocReader*
PolyDocReader_init(PolyDocReader *self, VArray *readers, I32Array *offsets) {
DocReader_init((DocReader*)self, NULL, NULL, NULL, NULL, -1);
for (uint32_t i = 0, max = VA_Get_Size(readers); i < max; i++) {
CERTIFY(VA_Fetch(readers, i), DOCREADER);
}
self->readers = (VArray*)INCREF(readers);
self->offsets = (I32Array*)INCREF(offsets);
return self;
}
void
PolyDocReader_close(PolyDocReader *self) {
if (self->readers) {
for (uint32_t i = 0, max = VA_Get_Size(self->readers); i < max; i++) {
DocReader *reader = (DocReader*)VA_Fetch(self->readers, i);
if (reader) { DocReader_Close(reader); }
}
VA_Clear(self->readers);
}
}
void
PolyDocReader_destroy(PolyDocReader *self) {
DECREF(self->readers);
DECREF(self->offsets);
SUPER_DESTROY(self, POLYDOCREADER);
}
HitDoc*
PolyDocReader_fetch_doc(PolyDocReader *self, int32_t doc_id) {
uint32_t seg_tick = PolyReader_sub_tick(self->offsets, doc_id);
int32_t offset = I32Arr_Get(self->offsets, seg_tick);
DocReader *doc_reader = (DocReader*)VA_Fetch(self->readers, seg_tick);
HitDoc *hit_doc = NULL;
if (!doc_reader) {
THROW(ERR, "Invalid doc_id: %i32", doc_id);
}
else {
hit_doc = DocReader_Fetch_Doc(doc_reader, doc_id - offset);
HitDoc_Set_Doc_ID(hit_doc, doc_id);
}
return hit_doc;
}
DefaultDocReader*
DefDocReader_new(Schema *schema, Folder *folder, Snapshot *snapshot,
VArray *segments, int32_t seg_tick) {
DefaultDocReader *self
= (DefaultDocReader*)VTable_Make_Obj(DEFAULTDOCREADER);
return DefDocReader_init(self, schema, folder, snapshot, segments,
seg_tick);
}
void
DefDocReader_close(DefaultDocReader *self) {
if (self->dat_in != NULL) {
InStream_Close(self->dat_in);
DECREF(self->dat_in);
self->dat_in = NULL;
}
if (self->ix_in != NULL) {
InStream_Close(self->ix_in);
DECREF(self->ix_in);
self->ix_in = NULL;
}
}
void
DefDocReader_destroy(DefaultDocReader *self) {
DECREF(self->ix_in);
DECREF(self->dat_in);
SUPER_DESTROY(self, DEFAULTDOCREADER);
}
DefaultDocReader*
DefDocReader_init(DefaultDocReader *self, Schema *schema, Folder *folder,
Snapshot *snapshot, VArray *segments, int32_t seg_tick) {
Hash *metadata;
Segment *segment;
DocReader_init((DocReader*)self, schema, folder, snapshot, segments,
seg_tick);
segment = DefDocReader_Get_Segment(self);
metadata = (Hash*)Seg_Fetch_Metadata_Str(segment, "documents", 9);
if (metadata) {
CharBuf *seg_name = Seg_Get_Name(segment);
CharBuf *ix_file = CB_newf("%o/documents.ix", seg_name);
CharBuf *dat_file = CB_newf("%o/documents.dat", seg_name);
Obj *format = Hash_Fetch_Str(metadata, "format", 6);
// Check format.
if (!format) { THROW(ERR, "Missing 'format' var"); }
else {
int64_t format_val = Obj_To_I64(format);
if (format_val < DocWriter_current_file_format) {
THROW(ERR, "Obsolete doc storage format %i64; "
"Index regeneration is required", format_val);
}
else if (format_val != DocWriter_current_file_format) {
THROW(ERR, "Unsupported doc storage format: %i64", format_val);
}
}
// Get streams.
if (Folder_Exists(folder, ix_file)) {
self->ix_in = Folder_Open_In(folder, ix_file);
if (!self->ix_in) {
Err *error = (Err*)INCREF(Err_get_error());
DECREF(ix_file);
DECREF(dat_file);
DECREF(self);
RETHROW(error);
}
self->dat_in = Folder_Open_In(folder, dat_file);
if (!self->dat_in) {
Err *error = (Err*)INCREF(Err_get_error());
DECREF(ix_file);
DECREF(dat_file);
DECREF(self);
RETHROW(error);
}
}
DECREF(ix_file);
DECREF(dat_file);
}
return self;
}
void
DefDocReader_read_record(DefaultDocReader *self, ByteBuf *buffer,
int32_t doc_id) {
// Find start and length of variable length record.
InStream_Seek(self->ix_in, (int64_t)doc_id * 8);
int64_t start = InStream_Read_I64(self->ix_in);
int64_t end = InStream_Read_I64(self->ix_in);
size_t size = (size_t)(end - start);
// Read in the record.
char *buf = BB_Grow(buffer, size);
InStream_Seek(self->dat_in, start);
InStream_Read_Bytes(self->dat_in, buf, size);
BB_Set_Size(buffer, size);
}