| /* Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #define C_LUCY_SORTFIELDWRITER |
| #include "Lucy/Util/ToolSet.h" |
| #include <math.h> |
| |
| #include "Lucy/Index/SortFieldWriter.h" |
| #include "Lucy/Index/Inverter.h" |
| #include "Lucy/Index/PolyReader.h" |
| #include "Lucy/Index/Segment.h" |
| #include "Lucy/Index/SegReader.h" |
| #include "Lucy/Index/Snapshot.h" |
| #include "Lucy/Index/SortCache.h" |
| #include "Lucy/Index/SortCache/NumericSortCache.h" |
| #include "Lucy/Index/SortCache/TextSortCache.h" |
| #include "Lucy/Index/SortReader.h" |
| #include "Lucy/Index/ZombieKeyedHash.h" |
| #include "Lucy/Plan/FieldType.h" |
| #include "Lucy/Plan/Schema.h" |
| #include "Lucy/Store/Folder.h" |
| #include "Lucy/Store/InStream.h" |
| #include "Lucy/Store/OutStream.h" |
| #include "Lucy/Util/Memory.h" |
| #include "Lucy/Util/MemoryPool.h" |
| #include "Lucy/Util/SortUtils.h" |
| |
| // Prepare to read back a run. |
| static void |
| S_flip_run(SortFieldWriter *run, size_t sub_thresh, InStream *ord_in, |
| InStream *ix_in, InStream *dat_in); |
| |
| // Write out a sort cache. Returns the number of unique values in the sort |
| // cache. |
| static int32_t |
| S_write_files(SortFieldWriter *self, OutStream *ord_out, OutStream *ix_out, |
| OutStream *dat_out); |
| |
| typedef struct lucy_SFWriterElem { |
| Obj *value; |
| int32_t doc_id; |
| } lucy_SFWriterElem; |
| #define SFWriterElem lucy_SFWriterElem |
| |
| SortFieldWriter* |
| SortFieldWriter_new(Schema *schema, Snapshot *snapshot, Segment *segment, |
| PolyReader *polyreader, const CharBuf *field, |
| MemoryPool *memory_pool, size_t mem_thresh, |
| OutStream *temp_ord_out, OutStream *temp_ix_out, |
| OutStream *temp_dat_out) { |
| SortFieldWriter *self |
| = (SortFieldWriter*)VTable_Make_Obj(SORTFIELDWRITER); |
| return SortFieldWriter_init(self, schema, snapshot, segment, polyreader, |
| field, memory_pool, mem_thresh, temp_ord_out, |
| temp_ix_out, temp_dat_out); |
| } |
| |
| SortFieldWriter* |
| SortFieldWriter_init(SortFieldWriter *self, Schema *schema, |
| Snapshot *snapshot, Segment *segment, |
| PolyReader *polyreader, const CharBuf *field, |
| MemoryPool *memory_pool, size_t mem_thresh, |
| OutStream *temp_ord_out, OutStream *temp_ix_out, |
| OutStream *temp_dat_out) { |
| // Init. |
| SortEx_init((SortExternal*)self, sizeof(SFWriterElem)); |
| self->null_ord = -1; |
| self->count = 0; |
| self->ord_start = 0; |
| self->ord_end = 0; |
| self->ix_start = 0; |
| self->ix_end = 0; |
| self->dat_start = 0; |
| self->dat_end = 0; |
| self->run_cardinality = -1; |
| self->run_max = -1; |
| self->sort_cache = NULL; |
| self->doc_map = NULL; |
| self->sorted_ids = NULL; |
| self->run_ord = 0; |
| self->run_tick = 0; |
| self->ord_width = 0; |
| |
| // Assign. |
| self->field = CB_Clone(field); |
| self->schema = (Schema*)INCREF(schema); |
| self->snapshot = (Snapshot*)INCREF(snapshot); |
| self->segment = (Segment*)INCREF(segment); |
| self->polyreader = (PolyReader*)INCREF(polyreader); |
| self->mem_pool = (MemoryPool*)INCREF(memory_pool); |
| self->temp_ord_out = (OutStream*)INCREF(temp_ord_out); |
| self->temp_ix_out = (OutStream*)INCREF(temp_ix_out); |
| self->temp_dat_out = (OutStream*)INCREF(temp_dat_out); |
| self->mem_thresh = mem_thresh; |
| |
| // Derive. |
| self->field_num = Seg_Field_Num(segment, field); |
| FieldType *type = (FieldType*)CERTIFY( |
| Schema_Fetch_Type(self->schema, field), FIELDTYPE); |
| self->type = (FieldType*)INCREF(type); |
| self->prim_id = FType_Primitive_ID(type); |
| if (self->prim_id == FType_TEXT || self->prim_id == FType_BLOB) { |
| self->var_width = true; |
| } |
| else { |
| self->var_width = false; |
| } |
| self->uniq_vals = (Hash*)ZKHash_new(memory_pool, self->prim_id); |
| |
| return self; |
| } |
| |
| void |
| SortFieldWriter_clear_cache(SortFieldWriter *self) { |
| if (self->uniq_vals) { |
| Hash_Clear(self->uniq_vals); |
| } |
| SortFieldWriter_clear_cache_t super_clear_cache |
| = (SortFieldWriter_clear_cache_t)SUPER_METHOD( |
| self->vtable, SortFieldWriter, Clear_Cache); |
| super_clear_cache(self); |
| } |
| |
| void |
| SortFieldWriter_destroy(SortFieldWriter *self) { |
| DECREF(self->uniq_vals); |
| self->uniq_vals = NULL; |
| DECREF(self->field); |
| DECREF(self->schema); |
| DECREF(self->snapshot); |
| DECREF(self->segment); |
| DECREF(self->polyreader); |
| DECREF(self->type); |
| DECREF(self->mem_pool); |
| DECREF(self->temp_ord_out); |
| DECREF(self->temp_ix_out); |
| DECREF(self->temp_dat_out); |
| DECREF(self->ord_in); |
| DECREF(self->ix_in); |
| DECREF(self->dat_in); |
| DECREF(self->sort_cache); |
| DECREF(self->doc_map); |
| FREEMEM(self->sorted_ids); |
| SUPER_DESTROY(self, SORTFIELDWRITER); |
| } |
| |
| int32_t |
| SortFieldWriter_get_null_ord(SortFieldWriter *self) { |
| return self->null_ord; |
| } |
| |
| int32_t |
| SortFieldWriter_get_ord_width(SortFieldWriter *self) { |
| return self->ord_width; |
| } |
| |
| static Obj* |
| S_find_unique_value(Hash *uniq_vals, Obj *val) { |
| int32_t hash_sum = Obj_Hash_Sum(val); |
| Obj *uniq_val = Hash_Find_Key(uniq_vals, val, hash_sum); |
| if (!uniq_val) { |
| Hash_Store(uniq_vals, val, INCREF(&EMPTY)); |
| uniq_val = Hash_Find_Key(uniq_vals, val, hash_sum); |
| } |
| return uniq_val; |
| } |
| |
| void |
| SortFieldWriter_add(SortFieldWriter *self, int32_t doc_id, Obj *value) { |
| // Uniq-ify the value, and record it for this document. |
| SFWriterElem elem; |
| elem.value = S_find_unique_value(self->uniq_vals, value); |
| elem.doc_id = doc_id; |
| SortFieldWriter_Feed(self, &elem); |
| self->count++; |
| } |
| |
| void |
| SortFieldWriter_add_segment(SortFieldWriter *self, SegReader *reader, |
| I32Array *doc_map, SortCache *sort_cache) { |
| if (!sort_cache) { return; } |
| SortFieldWriter *run |
| = SortFieldWriter_new(self->schema, self->snapshot, self->segment, |
| self->polyreader, self->field, self->mem_pool, |
| self->mem_thresh, NULL, NULL, NULL); |
| run->sort_cache = (SortCache*)INCREF(sort_cache); |
| run->doc_map = (I32Array*)INCREF(doc_map); |
| run->run_max = SegReader_Doc_Max(reader); |
| run->run_cardinality = SortCache_Get_Cardinality(sort_cache); |
| run->null_ord = SortCache_Get_Null_Ord(sort_cache); |
| run->run_tick = 1; |
| SortFieldWriter_Add_Run(self, (SortExternal*)run); |
| } |
| |
| static int32_t |
| S_calc_width(int32_t cardinality) { |
| if (cardinality <= 0x00000002) { return 1; } |
| else if (cardinality <= 0x00000004) { return 2; } |
| else if (cardinality <= 0x0000000F) { return 4; } |
| else if (cardinality <= 0x000000FF) { return 8; } |
| else if (cardinality <= 0x0000FFFF) { return 16; } |
| else { return 32; } |
| } |
| |
| static void |
| S_write_ord(void *ords, int32_t width, int32_t doc_id, int32_t ord) { |
| switch (width) { |
| case 1: |
| if (ord) { NumUtil_u1set(ords, doc_id); } |
| else { NumUtil_u1clear(ords, doc_id); } |
| break; |
| case 2: |
| NumUtil_u2set(ords, doc_id, ord); |
| break; |
| case 4: |
| NumUtil_u4set(ords, doc_id, ord); |
| break; |
| case 8: { |
| uint8_t *ints = (uint8_t*)ords; |
| ints[doc_id] = ord; |
| } |
| break; |
| case 16: { |
| uint8_t *bytes = (uint8_t*)ords; |
| bytes += doc_id * sizeof(uint16_t); |
| NumUtil_encode_bigend_u16(ord, &bytes); |
| } |
| break; |
| case 32: { |
| uint8_t *bytes = (uint8_t*)ords; |
| bytes += doc_id * sizeof(uint32_t); |
| NumUtil_encode_bigend_u32(ord, &bytes); |
| } |
| break; |
| default: |
| THROW(ERR, "Invalid width: %i32", width); |
| } |
| } |
| |
| static void |
| S_write_val(Obj *val, int8_t prim_id, OutStream *ix_out, OutStream *dat_out, |
| int64_t dat_start) { |
| if (val) { |
| switch (prim_id & FType_PRIMITIVE_ID_MASK) { |
| case FType_TEXT: { |
| CharBuf *string = (CharBuf*)val; |
| int64_t dat_pos = OutStream_Tell(dat_out) - dat_start; |
| OutStream_Write_I64(ix_out, dat_pos); |
| OutStream_Write_Bytes(dat_out, (char*)CB_Get_Ptr8(string), |
| CB_Get_Size(string)); |
| break; |
| } |
| case FType_BLOB: { |
| ByteBuf *byte_buf = (ByteBuf*)val; |
| int64_t dat_pos = OutStream_Tell(dat_out) - dat_start; |
| OutStream_Write_I64(ix_out, dat_pos); |
| OutStream_Write_Bytes(dat_out, BB_Get_Buf(byte_buf), |
| BB_Get_Size(byte_buf)); |
| break; |
| } |
| case FType_INT32: { |
| Integer32 *i32 = (Integer32*)val; |
| OutStream_Write_I32(dat_out, Int32_Get_Value(i32)); |
| break; |
| } |
| case FType_INT64: { |
| Integer64 *i64 = (Integer64*)val; |
| OutStream_Write_I64(dat_out, Int64_Get_Value(i64)); |
| break; |
| } |
| case FType_FLOAT64: { |
| Float64 *float64 = (Float64*)val; |
| OutStream_Write_F64(dat_out, Float64_Get_Value(float64)); |
| break; |
| } |
| case FType_FLOAT32: { |
| Float32 *float32 = (Float32*)val; |
| OutStream_Write_F32(dat_out, Float32_Get_Value(float32)); |
| break; |
| } |
| default: |
| THROW(ERR, "Unrecognized primitive id: %i32", (int32_t)prim_id); |
| } |
| } |
| else { |
| switch (prim_id & FType_PRIMITIVE_ID_MASK) { |
| case FType_TEXT: |
| case FType_BLOB: { |
| int64_t dat_pos = OutStream_Tell(dat_out) - dat_start; |
| OutStream_Write_I64(ix_out, dat_pos); |
| } |
| break; |
| case FType_INT32: |
| OutStream_Write_I32(dat_out, 0); |
| break; |
| case FType_INT64: |
| OutStream_Write_I64(dat_out, 0); |
| break; |
| case FType_FLOAT64: |
| OutStream_Write_F64(dat_out, 0.0); |
| break; |
| case FType_FLOAT32: |
| OutStream_Write_F32(dat_out, 0.0f); |
| break; |
| default: |
| THROW(ERR, "Unrecognized primitive id: %i32", (int32_t)prim_id); |
| } |
| } |
| } |
| |
| int |
| SortFieldWriter_compare(SortFieldWriter *self, void *va, void *vb) { |
| SFWriterElem *a = (SFWriterElem*)va; |
| SFWriterElem *b = (SFWriterElem*)vb; |
| int32_t comparison |
| = FType_null_back_compare_values(self->type, a->value, b->value); |
| if (comparison == 0) { comparison = b->doc_id - a->doc_id; } |
| return comparison; |
| } |
| |
| static int |
| S_compare_doc_ids_by_ord_rev(void *context, const void *va, const void *vb) { |
| SortCache *sort_cache = (SortCache*)context; |
| int32_t a = *(int32_t*)va; |
| int32_t b = *(int32_t*)vb; |
| int32_t ord_a = SortCache_Ordinal(sort_cache, a); |
| int32_t ord_b = SortCache_Ordinal(sort_cache, b); |
| return ord_a - ord_b; |
| } |
| |
| static void |
| S_lazy_init_sorted_ids(SortFieldWriter *self) { |
| if (!self->sorted_ids) { |
| self->sorted_ids |
| = (int32_t*)MALLOCATE((self->run_max + 1) * sizeof(int32_t)); |
| for (int32_t i = 0, max = self->run_max; i <= max; i++) { |
| self->sorted_ids[i] = i; |
| } |
| Sort_quicksort(self->sorted_ids + 1, self->run_max, sizeof(int32_t), |
| S_compare_doc_ids_by_ord_rev, self->sort_cache); |
| } |
| } |
| |
| void |
| SortFieldWriter_flush(SortFieldWriter *self) { |
| // Don't add a run unless we have data to put in it. |
| if (SortFieldWriter_Cache_Count(self) == 0) { return; } |
| |
| OutStream *const temp_ord_out = self->temp_ord_out; |
| OutStream *const temp_ix_out = self->temp_ix_out; |
| OutStream *const temp_dat_out = self->temp_dat_out; |
| |
| SortFieldWriter_Sort_Cache(self); |
| SortFieldWriter *run |
| = SortFieldWriter_new(self->schema, self->snapshot, self->segment, |
| self->polyreader, self->field, self->mem_pool, |
| self->mem_thresh, NULL, NULL, NULL); |
| |
| // Record stream starts and align. |
| run->ord_start = OutStream_Align(temp_ord_out, sizeof(int64_t)); |
| if (self->var_width) { |
| run->ix_start = OutStream_Align(temp_ix_out, sizeof(int64_t)); |
| } |
| run->dat_start = OutStream_Align(temp_dat_out, sizeof(int64_t)); |
| |
| // Have the run borrow the array of elems. |
| run->cache = self->cache; |
| run->cache_max = self->cache_max; |
| run->cache_tick = self->cache_tick; |
| run->cache_cap = self->cache_cap; |
| |
| // Write files, record stats. |
| run->run_max = (int32_t)Seg_Get_Count(self->segment); |
| run->run_cardinality = S_write_files(run, temp_ord_out, temp_ix_out, |
| temp_dat_out); |
| |
| // Reclaim the buffer from the run and empty it. |
| run->cache = NULL; |
| run->cache_max = 0; |
| run->cache_tick = 0; |
| run->cache_cap = 0; |
| self->cache_tick = self->cache_max; |
| SortFieldWriter_Clear_Cache(self); |
| |
| // Record stream ends. |
| run->ord_end = OutStream_Tell(temp_ord_out); |
| if (self->var_width) { |
| run->ix_end = OutStream_Tell(temp_ix_out); |
| } |
| run->dat_end = OutStream_Tell(temp_dat_out); |
| |
| // Add the run to the array. |
| SortFieldWriter_Add_Run(self, (SortExternal*)run); |
| } |
| |
| uint32_t |
| SortFieldWriter_refill(SortFieldWriter *self) { |
| if (!self->sort_cache) { return 0; } |
| |
| // Sanity check, then reset the cache and prepare to start loading items. |
| uint32_t cache_count = SortFieldWriter_Cache_Count(self); |
| if (cache_count) { |
| THROW(ERR, "Refill called but cache contains %u32 items", |
| cache_count); |
| } |
| SortFieldWriter_Clear_Cache(self); |
| MemPool_Release_All(self->mem_pool); |
| S_lazy_init_sorted_ids(self); |
| |
| const int32_t null_ord = self->null_ord; |
| Hash *const uniq_vals = self->uniq_vals; |
| I32Array *const doc_map = self->doc_map; |
| SortCache *const sort_cache = self->sort_cache; |
| Obj *const blank = SortCache_Make_Blank(sort_cache); |
| |
| while (self->run_ord < self->run_cardinality |
| && MemPool_Get_Consumed(self->mem_pool) < self->mem_thresh |
| ) { |
| Obj *val = SortCache_Value(sort_cache, self->run_ord, blank); |
| if (val) { |
| Hash_Store(uniq_vals, val, INCREF(&EMPTY)); |
| break; |
| } |
| self->run_ord++; |
| } |
| uint32_t count = 0; |
| while (self->run_tick <= self->run_max) { |
| int32_t raw_doc_id = self->sorted_ids[self->run_tick]; |
| int32_t ord = SortCache_Ordinal(sort_cache, raw_doc_id); |
| if (ord != null_ord) { |
| int32_t remapped = doc_map |
| ? I32Arr_Get(doc_map, raw_doc_id) |
| : raw_doc_id; |
| if (remapped) { |
| Obj *val = SortCache_Value(sort_cache, ord, blank); |
| SortFieldWriter_Add(self, remapped, val); |
| count++; |
| } |
| } |
| else if (ord > self->run_ord) { |
| break; |
| } |
| self->run_tick++; |
| } |
| self->run_ord++; |
| SortFieldWriter_Sort_Cache(self); |
| |
| if (self->run_ord >= self->run_cardinality) { |
| DECREF(self->sort_cache); |
| self->sort_cache = NULL; |
| } |
| |
| DECREF(blank); |
| return count; |
| } |
| |
| void |
| SortFieldWriter_flip(SortFieldWriter *self) { |
| uint32_t num_items = SortFieldWriter_Cache_Count(self); |
| uint32_t num_runs = VA_Get_Size(self->runs); |
| |
| if (self->flipped) { THROW(ERR, "Can't call Flip() twice"); } |
| self->flipped = true; |
| |
| // Sanity check. |
| if (num_runs && num_items) { |
| THROW(ERR, "Sanity check failed: num_runs: %u32 num_items: %u32", |
| num_runs, num_items); |
| } |
| |
| if (num_items) { |
| SortFieldWriter_Sort_Cache(self); |
| } |
| else if (num_runs) { |
| Folder *folder = PolyReader_Get_Folder(self->polyreader); |
| CharBuf *seg_name = Seg_Get_Name(self->segment); |
| CharBuf *filepath = CB_newf("%o/sort_ord_temp", seg_name); |
| self->ord_in = Folder_Open_In(folder, filepath); |
| if (!self->ord_in) { RETHROW(INCREF(Err_get_error())); } |
| if (self->var_width) { |
| CB_setf(filepath, "%o/sort_ix_temp", seg_name); |
| self->ix_in = Folder_Open_In(folder, filepath); |
| if (!self->ix_in) { RETHROW(INCREF(Err_get_error())); } |
| } |
| CB_setf(filepath, "%o/sort_dat_temp", seg_name); |
| self->dat_in = Folder_Open_In(folder, filepath); |
| if (!self->dat_in) { RETHROW(INCREF(Err_get_error())); } |
| DECREF(filepath); |
| |
| // Assign streams and a slice of mem_thresh. |
| size_t sub_thresh = self->mem_thresh / num_runs; |
| if (sub_thresh < 65536) { sub_thresh = 65536; } |
| for (uint32_t i = 0; i < num_runs; i++) { |
| SortFieldWriter *run = (SortFieldWriter*)VA_Fetch(self->runs, i); |
| S_flip_run(run, sub_thresh, self->ord_in, self->ix_in, |
| self->dat_in); |
| } |
| } |
| |
| self->flipped = true; |
| } |
| |
| static int32_t |
| S_write_files(SortFieldWriter *self, OutStream *ord_out, OutStream *ix_out, |
| OutStream *dat_out) { |
| int8_t prim_id = self->prim_id; |
| int32_t doc_max = (int32_t)Seg_Get_Count(self->segment); |
| bool_t has_nulls = self->count == doc_max ? false : true; |
| size_t size = (doc_max + 1) * sizeof(int32_t); |
| int32_t *ords = (int32_t*)MALLOCATE(size); |
| int32_t ord = 0; |
| int64_t dat_start = OutStream_Tell(dat_out); |
| |
| // Assign -1 as a stand-in for the NULL ord. |
| for (int32_t i = 0; i <= doc_max; i++) { |
| ords[i] = -1; |
| } |
| |
| // Grab the first item and record its ord. Add a dummy ord for invalid |
| // doc id 0. |
| SFWriterElem *elem = (SFWriterElem*)SortFieldWriter_Fetch(self); |
| ords[elem->doc_id] = ord; |
| ords[0] = 0; |
| |
| // Build array of ords, write non-NULL sorted values. |
| Obj *val = Obj_Clone(elem->value); |
| Obj *last_val_address = elem->value; |
| S_write_val(elem->value, prim_id, ix_out, dat_out, dat_start); |
| while (NULL != (elem = (SFWriterElem*)SortFieldWriter_Fetch(self))) { |
| if (elem->value != last_val_address) { |
| int32_t comparison |
| = FType_Compare_Values(self->type, elem->value, val); |
| if (comparison != 0) { |
| ord++; |
| S_write_val(elem->value, prim_id, ix_out, dat_out, dat_start); |
| Obj_Mimic(val, elem->value); |
| } |
| last_val_address = elem->value; |
| } |
| ords[elem->doc_id] = ord; |
| } |
| DECREF(val); |
| |
| // If there are NULL values, write one now and record the NULL ord. |
| if (has_nulls) { |
| S_write_val(NULL, prim_id, ix_out, dat_out, dat_start); |
| ord++; |
| self->null_ord = ord; |
| } |
| int32_t null_ord = self->null_ord; |
| |
| // Write one extra file pointer so that we can always derive length. |
| if (self->var_width) { |
| OutStream_Write_I64(ix_out, OutStream_Tell(dat_out) - dat_start); |
| } |
| |
| // Calculate cardinality and ord width. |
| int32_t cardinality = ord + 1; |
| self->ord_width = S_calc_width(cardinality); |
| int32_t ord_width = self->ord_width; |
| |
| // Write ords. |
| const double BITS_PER_BYTE = 8.0; |
| double bytes_per_doc = ord_width / BITS_PER_BYTE; |
| double byte_count = ceil((doc_max + 1) * bytes_per_doc); |
| char *compressed_ords |
| = (char*)CALLOCATE((size_t)byte_count, sizeof(char)); |
| for (int32_t i = 0; i <= doc_max; i++) { |
| int32_t real_ord = ords[i] == -1 ? null_ord : ords[i]; |
| S_write_ord(compressed_ords, ord_width, i, real_ord); |
| } |
| OutStream_Write_Bytes(ord_out, compressed_ords, (size_t)byte_count); |
| FREEMEM(compressed_ords); |
| |
| FREEMEM(ords); |
| return cardinality; |
| } |
| |
| int32_t |
| SortFieldWriter_finish(SortFieldWriter *self) { |
| // Bail if there's no data. |
| if (!SortFieldWriter_Peek(self)) { return 0; } |
| |
| int32_t field_num = self->field_num; |
| Folder *folder = PolyReader_Get_Folder(self->polyreader); |
| CharBuf *seg_name = Seg_Get_Name(self->segment); |
| CharBuf *path = CB_newf("%o/sort-%i32.ord", seg_name, field_num); |
| |
| // Open streams. |
| OutStream *ord_out = Folder_Open_Out(folder, path); |
| if (!ord_out) { RETHROW(INCREF(Err_get_error())); } |
| OutStream *ix_out = NULL; |
| if (self->var_width) { |
| CB_setf(path, "%o/sort-%i32.ix", seg_name, field_num); |
| ix_out = Folder_Open_Out(folder, path); |
| if (!ix_out) { RETHROW(INCREF(Err_get_error())); } |
| } |
| CB_setf(path, "%o/sort-%i32.dat", seg_name, field_num); |
| OutStream *dat_out = Folder_Open_Out(folder, path); |
| if (!dat_out) { RETHROW(INCREF(Err_get_error())); } |
| DECREF(path); |
| |
| int32_t cardinality = S_write_files(self, ord_out, ix_out, dat_out); |
| |
| // Close streams. |
| OutStream_Close(ord_out); |
| if (ix_out) { OutStream_Close(ix_out); } |
| OutStream_Close(dat_out); |
| DECREF(dat_out); |
| DECREF(ix_out); |
| DECREF(ord_out); |
| |
| return cardinality; |
| } |
| |
| static void |
| S_flip_run(SortFieldWriter *run, size_t sub_thresh, InStream *ord_in, |
| InStream *ix_in, InStream *dat_in) { |
| if (run->flipped) { THROW(ERR, "Can't Flip twice"); } |
| run->flipped = true; |
| |
| // Get our own MemoryPool, ZombieKeyedHash, and slice of mem_thresh. |
| DECREF(run->uniq_vals); |
| DECREF(run->mem_pool); |
| run->mem_pool = MemPool_new(0); |
| run->uniq_vals = (Hash*)ZKHash_new(run->mem_pool, run->prim_id); |
| run->mem_thresh = sub_thresh; |
| |
| // Done if we already have a SortCache to read from. |
| if (run->sort_cache) { return; } |
| |
| // Open the temp files for reading. |
| CharBuf *seg_name = Seg_Get_Name(run->segment); |
| CharBuf *alias = CB_newf("%o/sort_ord_temp-%i64-to-%i64", seg_name, |
| run->ord_start, run->ord_end); |
| InStream *ord_in_dupe = InStream_Reopen(ord_in, alias, run->ord_start, |
| run->ord_end - run->ord_start); |
| InStream *ix_in_dupe = NULL; |
| if (run->var_width) { |
| CB_setf(alias, "%o/sort_ix_temp-%i64-to-%i64", seg_name, |
| run->ix_start, run->ix_end); |
| ix_in_dupe = InStream_Reopen(ix_in, alias, run->ix_start, |
| run->ix_end - run->ix_start); |
| } |
| CB_setf(alias, "%o/sort_dat_temp-%i64-to-%i64", seg_name, |
| run->dat_start, run->dat_end); |
| InStream *dat_in_dupe = InStream_Reopen(dat_in, alias, run->dat_start, |
| run->dat_end - run->dat_start); |
| DECREF(alias); |
| |
| // Get a SortCache. |
| CharBuf *field = Seg_Field_Name(run->segment, run->field_num); |
| switch (run->prim_id & FType_PRIMITIVE_ID_MASK) { |
| case FType_TEXT: |
| run->sort_cache = (SortCache*)TextSortCache_new( |
| field, run->type, run->run_cardinality, |
| run->run_max, run->null_ord, |
| run->ord_width, ord_in_dupe, |
| ix_in_dupe, dat_in_dupe); |
| break; |
| case FType_INT32: |
| run->sort_cache = (SortCache*)I32SortCache_new( |
| field, run->type, run->run_cardinality, |
| run->run_max, run->null_ord, |
| run->ord_width, ord_in_dupe, |
| dat_in_dupe); |
| break; |
| case FType_INT64: |
| run->sort_cache = (SortCache*)I64SortCache_new( |
| field, run->type, run->run_cardinality, |
| run->run_max, run->null_ord, |
| run->ord_width, ord_in_dupe, |
| dat_in_dupe); |
| break; |
| case FType_FLOAT32: |
| run->sort_cache = (SortCache*)F32SortCache_new( |
| field, run->type, run->run_cardinality, |
| run->run_max, run->null_ord, |
| run->ord_width, ord_in_dupe, |
| dat_in_dupe); |
| break; |
| case FType_FLOAT64: |
| run->sort_cache = (SortCache*)F64SortCache_new( |
| field, run->type, run->run_cardinality, |
| run->run_max, run->null_ord, |
| run->ord_width, ord_in_dupe, |
| dat_in_dupe); |
| break; |
| default: |
| THROW(ERR, "No SortCache class for %o", run->type); |
| } |
| |
| DECREF(ord_in_dupe); |
| DECREF(ix_in_dupe); |
| DECREF(dat_in_dupe); |
| } |
| |
| |