blob: 08606b0c8d1049ea62ae5ef06ef9ee4c5f7058cf [file] [log] [blame]
/* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#define C_LUCY_POLYREADER
#include "Lucy/Util/ToolSet.h"
#include "Lucy/Index/PolyReader.h"
#include "Lucy/Document/HitDoc.h"
#include "Lucy/Index/DeletionsReader.h"
#include "Lucy/Index/IndexManager.h"
#include "Lucy/Index/Segment.h"
#include "Lucy/Index/SegReader.h"
#include "Lucy/Index/Snapshot.h"
#include "Lucy/Plan/FieldType.h"
#include "Lucy/Plan/Schema.h"
#include "Lucy/Store/Folder.h"
#include "Lucy/Store/FSFolder.h"
#include "Lucy/Store/Lock.h"
#include "Lucy/Util/Json.h"
#include "Lucy/Util/IndexFileNames.h"
#include "Lucy/Util/StringHelper.h"
// Obtain/release read locks and commit locks. If self->manager is
// NULL, do nothing.
static void
S_obtain_read_lock(PolyReader *self, const CharBuf *snapshot_filename);
static void
S_obtain_deletion_lock(PolyReader *self);
static void
S_release_read_lock(PolyReader *self);
static void
S_release_deletion_lock(PolyReader *self);
static Folder*
S_derive_folder(Obj *index);
PolyReader*
PolyReader_new(Schema *schema, Folder *folder, Snapshot *snapshot,
IndexManager *manager, VArray *sub_readers) {
PolyReader *self = (PolyReader*)VTable_Make_Obj(POLYREADER);
return PolyReader_init(self, schema, folder, snapshot, manager,
sub_readers);
}
PolyReader*
PolyReader_open(Obj *index, Snapshot *snapshot, IndexManager *manager) {
PolyReader *self = (PolyReader*)VTable_Make_Obj(POLYREADER);
return PolyReader_do_open(self, index, snapshot, manager);
}
static Obj*
S_first_non_null(VArray *array) {
uint32_t i, max;
for (i = 0, max = VA_Get_Size(array); i < max; i++) {
Obj *thing = VA_Fetch(array, i);
if (thing) { return thing; }
}
return NULL;
}
static void
S_init_sub_readers(PolyReader *self, VArray *sub_readers) {
uint32_t i;
uint32_t num_sub_readers = VA_Get_Size(sub_readers);
int32_t *starts = (int32_t*)MALLOCATE(num_sub_readers * sizeof(int32_t));
Hash *data_readers = Hash_new(0);
DECREF(self->sub_readers);
DECREF(self->offsets);
self->sub_readers = (VArray*)INCREF(sub_readers);
// Accumulate doc_max, subreader start offsets, and DataReaders.
self->doc_max = 0;
for (i = 0; i < num_sub_readers; i++) {
SegReader *seg_reader = (SegReader*)VA_Fetch(sub_readers, i);
Hash *components = SegReader_Get_Components(seg_reader);
CharBuf *api;
DataReader *component;
starts[i] = self->doc_max;
self->doc_max += SegReader_Doc_Max(seg_reader);
Hash_Iterate(components);
while (Hash_Next(components, (Obj**)&api, (Obj**)&component)) {
VArray *readers = (VArray*)Hash_Fetch(data_readers, (Obj*)api);
if (!readers) {
readers = VA_new(num_sub_readers);
Hash_Store(data_readers, (Obj*)api, (Obj*)readers);
}
VA_Store(readers, i, INCREF(component));
}
}
self->offsets = I32Arr_new_steal(starts, num_sub_readers);
{
CharBuf *api;
VArray *readers;
Hash_Iterate(data_readers);
while (Hash_Next(data_readers, (Obj**)&api, (Obj**)&readers)) {
DataReader *datareader = (DataReader*)CERTIFY(
S_first_non_null(readers),
DATAREADER);
DataReader *aggregator
= DataReader_Aggregator(datareader, readers, self->offsets);
if (aggregator) {
CERTIFY(aggregator, DATAREADER);
Hash_Store(self->components, (Obj*)api, (Obj*)aggregator);
}
}
}
DECREF(data_readers);
{
DeletionsReader *del_reader
= (DeletionsReader*)Hash_Fetch(
self->components, (Obj*)VTable_Get_Name(DELETIONSREADER));
self->del_count = del_reader ? DelReader_Del_Count(del_reader) : 0;
}
}
PolyReader*
PolyReader_init(PolyReader *self, Schema *schema, Folder *folder,
Snapshot *snapshot, IndexManager *manager,
VArray *sub_readers) {
self->doc_max = 0;
self->del_count = 0;
if (sub_readers) {
uint32_t num_segs = VA_Get_Size(sub_readers);
VArray *segments = VA_new(num_segs);
uint32_t i;
for (i = 0; i < num_segs; i++) {
SegReader *seg_reader
= (SegReader*)CERTIFY(VA_Fetch(sub_readers, i), SEGREADER);
VA_Push(segments, INCREF(SegReader_Get_Segment(seg_reader)));
}
IxReader_init((IndexReader*)self, schema, folder, snapshot,
segments, -1, manager);
DECREF(segments);
S_init_sub_readers(self, sub_readers);
}
else {
IxReader_init((IndexReader*)self, schema, folder, snapshot,
NULL, -1, manager);
self->sub_readers = VA_new(0);
self->offsets = I32Arr_new_steal(NULL, 0);
}
return self;
}
void
PolyReader_close(PolyReader *self) {
PolyReader_close_t super_close
= (PolyReader_close_t)SUPER_METHOD(POLYREADER, PolyReader, Close);
uint32_t i, max;
for (i = 0, max = VA_Get_Size(self->sub_readers); i < max; i++) {
SegReader *seg_reader = (SegReader*)VA_Fetch(self->sub_readers, i);
SegReader_Close(seg_reader);
}
super_close(self);
}
void
PolyReader_destroy(PolyReader *self) {
DECREF(self->sub_readers);
DECREF(self->offsets);
SUPER_DESTROY(self, POLYREADER);
}
Obj*
S_try_open_elements(PolyReader *self) {
VArray *files = Snapshot_List(self->snapshot);
Folder *folder = PolyReader_Get_Folder(self);
uint32_t num_segs = 0;
uint64_t latest_schema_gen = 0;
CharBuf *schema_file = NULL;
VArray *segments;
uint32_t i, max;
// Find schema file, count segments.
for (i = 0, max = VA_Get_Size(files); i < max; i++) {
CharBuf *entry = (CharBuf*)VA_Fetch(files, i);
if (Seg_valid_seg_name(entry)) {
num_segs++;
}
else if (CB_Starts_With_Str(entry, "schema_", 7)
&& CB_Ends_With_Str(entry, ".json", 5)
) {
uint64_t gen = IxFileNames_extract_gen(entry);
if (gen > latest_schema_gen) {
latest_schema_gen = gen;
if (!schema_file) { schema_file = CB_Clone(entry); }
else { CB_Mimic(schema_file, (Obj*)entry); }
}
}
}
// Read Schema.
if (!schema_file) {
CharBuf *mess = MAKE_MESS("Can't find a schema file.");
DECREF(files);
return (Obj*)mess;
}
else {
Hash *dump = (Hash*)Json_slurp_json(folder, schema_file);
if (dump) { // read file successfully
DECREF(self->schema);
self->schema = (Schema*)CERTIFY(
VTable_Load_Obj(SCHEMA, (Obj*)dump), SCHEMA);
DECREF(dump);
DECREF(schema_file);
schema_file = NULL;
}
else {
CharBuf *mess = MAKE_MESS("Failed to parse %o", schema_file);
DECREF(schema_file);
DECREF(files);
return (Obj*)mess;
}
}
segments = VA_new(num_segs);
for (i = 0, max = VA_Get_Size(files); i < max; i++) {
CharBuf *entry = (CharBuf*)VA_Fetch(files, i);
// Create a Segment for each segmeta.
if (Seg_valid_seg_name(entry)) {
int64_t seg_num = IxFileNames_extract_gen(entry);
Segment *segment = Seg_new(seg_num);
// Bail if reading the file fails (probably because it's been
// deleted and a new snapshot file has been written so we need to
// retry).
if (Seg_Read_File(segment, folder)) {
VA_Push(segments, (Obj*)segment);
}
else {
CharBuf *mess = MAKE_MESS("Failed to read %o", entry);
DECREF(segment);
DECREF(segments);
DECREF(files);
return (Obj*)mess;
}
}
}
// Sort the segments by age.
VA_Sort(segments, NULL, NULL);
{
Obj *result = PolyReader_Try_Open_SegReaders(self, segments);
DECREF(segments);
DECREF(files);
return result;
}
}
// For test suite.
CharBuf* PolyReader_race_condition_debug1 = NULL;
int32_t PolyReader_debug1_num_passes = 0;
PolyReader*
PolyReader_do_open(PolyReader *self, Obj *index, Snapshot *snapshot,
IndexManager *manager) {
Folder *folder = S_derive_folder(index);
uint64_t last_gen = 0;
PolyReader_init(self, NULL, folder, snapshot, manager, NULL);
DECREF(folder);
if (manager) { S_obtain_deletion_lock(self); }
while (1) {
CharBuf *target_snap_file;
uint64_t gen;
// If a Snapshot was supplied, use its file.
if (snapshot) {
target_snap_file = Snapshot_Get_Path(snapshot);
if (!target_snap_file) {
THROW(ERR, "Supplied snapshot objects must not be empty");
}
else {
CB_Inc_RefCount(target_snap_file);
}
}
else {
// Otherwise, pick the most recent snap file.
target_snap_file = IxFileNames_latest_snapshot(folder);
// No snap file? Looks like the index is empty. We can stop now
// and return NULL.
if (!target_snap_file) { break; }
}
// Derive "generation" of this snapshot file from its name.
gen = IxFileNames_extract_gen(target_snap_file);
// Get a read lock on the most recent snapshot file if indicated.
if (manager) {
S_obtain_read_lock(self, target_snap_file);
}
// Testing only.
if (PolyReader_race_condition_debug1) {
ZombieCharBuf *temp = ZCB_WRAP_STR("temp", 4);
if (Folder_Exists(folder, (CharBuf*)temp)) {
bool_t success = Folder_Rename(folder, (CharBuf*)temp,
PolyReader_race_condition_debug1);
if (!success) { RETHROW(INCREF(Err_get_error())); }
}
PolyReader_debug1_num_passes++;
}
// If a Snapshot object was passed in, the file has already been read.
// If that's not the case, we must read the file we just picked.
if (!snapshot) {
CharBuf *error = PolyReader_try_read_snapshot(self->snapshot, folder,
target_snap_file);
if (error) {
S_release_read_lock(self);
DECREF(target_snap_file);
if (last_gen < gen) { // Index updated, so try again.
DECREF(error);
last_gen = gen;
continue;
}
else { // Real error.
if (manager) { S_release_deletion_lock(self); }
Err_throw_mess(ERR, error);
}
}
}
/* It's possible, though unlikely, for an Indexer to delete files
* out from underneath us after the snapshot file is read but before
* we've got SegReaders holding open all the required files. If we
* failed to open something, see if we can find a newer snapshot file.
* If we can, then the exception was due to the race condition. If
* not, we have a real exception, so throw an error. */
{
Obj *result = S_try_open_elements(self);
if (Obj_Is_A(result, CHARBUF)) { // Error occurred.
S_release_read_lock(self);
DECREF(target_snap_file);
if (last_gen < gen) { // Index updated, so try again.
DECREF(result);
last_gen = gen;
}
else { // Real error.
if (manager) { S_release_deletion_lock(self); }
Err_throw_mess(ERR, (CharBuf*)result);
}
}
else { // Succeeded.
S_init_sub_readers(self, (VArray*)result);
DECREF(result);
DECREF(target_snap_file);
break;
}
}
}
if (manager) { S_release_deletion_lock(self); }
return self;
}
static Folder*
S_derive_folder(Obj *index) {
Folder *folder = NULL;
if (Obj_Is_A(index, FOLDER)) {
folder = (Folder*)INCREF(index);
}
else if (Obj_Is_A(index, CHARBUF)) {
folder = (Folder*)FSFolder_new((CharBuf*)index);
}
else {
THROW(ERR, "Invalid type for 'index': %o", Obj_Get_Class_Name(index));
}
return folder;
}
static void
S_obtain_deletion_lock(PolyReader *self) {
self->deletion_lock = IxManager_Make_Deletion_Lock(self->manager);
Lock_Clear_Stale(self->deletion_lock);
if (!Lock_Obtain(self->deletion_lock)) {
DECREF(self->deletion_lock);
self->deletion_lock = NULL;
THROW(LOCKERR, "Couldn't get commit lock");
}
}
static void
S_obtain_read_lock(PolyReader *self, const CharBuf *snapshot_file_name) {
if (!self->manager) { return; }
self->read_lock = IxManager_Make_Snapshot_Read_Lock(self->manager,
snapshot_file_name);
Lock_Clear_Stale(self->read_lock);
if (!Lock_Obtain(self->read_lock)) {
DECREF(self->read_lock);
THROW(LOCKERR, "Couldn't get read lock for %o", snapshot_file_name);
}
}
static void
S_release_read_lock(PolyReader *self) {
if (self->read_lock) {
Lock_Release(self->read_lock);
DECREF(self->read_lock);
self->read_lock = NULL;
}
}
static void
S_release_deletion_lock(PolyReader *self) {
if (self->deletion_lock) {
Lock_Release(self->deletion_lock);
DECREF(self->deletion_lock);
self->deletion_lock = NULL;
}
}
int32_t
PolyReader_doc_max(PolyReader *self) {
return self->doc_max;
}
int32_t
PolyReader_doc_count(PolyReader *self) {
return self->doc_max - self->del_count;
}
int32_t
PolyReader_del_count(PolyReader *self) {
return self->del_count;
}
I32Array*
PolyReader_offsets(PolyReader *self) {
return (I32Array*)INCREF(self->offsets);
}
VArray*
PolyReader_seg_readers(PolyReader *self) {
return (VArray*)VA_Shallow_Copy(self->sub_readers);
}
VArray*
PolyReader_get_seg_readers(PolyReader *self) {
return self->sub_readers;
}
uint32_t
PolyReader_sub_tick(I32Array *offsets, int32_t doc_id) {
int32_t size = I32Arr_Get_Size(offsets);
if (size == 0) {
return 0;
}
int32_t lo = -1;
int32_t hi = size;
while (hi - lo > 1) {
int32_t mid = lo + ((hi - lo) / 2);
int32_t offset = I32Arr_Get(offsets, mid);
if (doc_id <= offset) {
hi = mid;
}
else {
lo = mid;
}
}
if (hi == size) {
hi--;
}
while (hi > 0) {
int32_t offset = I32Arr_Get(offsets, hi);
if (doc_id <= offset) {
hi--;
}
else {
break;
}
}
return hi;
}