fix format.
diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h index c7744f0..859ad39 100644 --- a/cpp/src/common/tsblock/tsblock.h +++ b/cpp/src/common/tsblock/tsblock.h
@@ -144,12 +144,6 @@ ASSERT(tsblock_->row_count_ > 0); tsblock_->row_count_--; } - FORCE_INLINE uint32_t remaining() const { - return tsblock_->max_row_count_ - tsblock_->row_count_; - } - FORCE_INLINE void add_rows(uint32_t count) { - tsblock_->row_count_ += count; - } FORCE_INLINE void append(uint32_t slot_index, const char* value, uint32_t len) {
diff --git a/cpp/src/encoding/decoder.h b/cpp/src/encoding/decoder.h index f85ccf1..c290b57 100644 --- a/cpp/src/encoding/decoder.h +++ b/cpp/src/encoding/decoder.h
@@ -21,7 +21,6 @@ #define ENCODING_DECODER_H #include "common/allocator/byte_stream.h" -#include "common/db_common.h" namespace storage { @@ -38,102 +37,6 @@ virtual int read_double(double& ret_value, common::ByteStream& in) = 0; virtual int read_String(common::String& ret_value, common::PageArena& pa, common::ByteStream& in) = 0; - - virtual int read_batch_int32(int32_t* out, int capacity, int& actual, - common::ByteStream& in) { - actual = 0; - int ret = common::E_OK; - int32_t val; - while (actual < capacity && has_remaining(in)) { - ret = read_int32(val, in); - if (ret != common::E_OK) return ret; - out[actual++] = val; - } - return common::E_OK; - } - - virtual int read_batch_int64(int64_t* out, int capacity, int& actual, - common::ByteStream& in) { - actual = 0; - int ret = common::E_OK; - int64_t val; - while (actual < capacity && has_remaining(in)) { - ret = read_int64(val, in); - if (ret != common::E_OK) return ret; - out[actual++] = val; - } - return common::E_OK; - } - - virtual int read_batch_float(float* out, int capacity, int& actual, - common::ByteStream& in) { - actual = 0; - int ret = common::E_OK; - float val; - while (actual < capacity && has_remaining(in)) { - ret = read_float(val, in); - if (ret != common::E_OK) return ret; - out[actual++] = val; - } - return common::E_OK; - } - - virtual int read_batch_double(double* out, int capacity, int& actual, - common::ByteStream& in) { - actual = 0; - int ret = common::E_OK; - double val; - while (actual < capacity && has_remaining(in)) { - ret = read_double(val, in); - if (ret != common::E_OK) return ret; - out[actual++] = val; - } - return common::E_OK; - } - - virtual int skip_int32(int count, int& skipped, common::ByteStream& in) { - skipped = 0; - int32_t dummy; - while (skipped < count && has_remaining(in)) { - int ret = read_int32(dummy, in); - if (ret != common::E_OK) return ret; - ++skipped; - } - return common::E_OK; - } - - virtual int skip_int64(int count, int& skipped, common::ByteStream& in) { - skipped = 0; - int64_t dummy; - while (skipped < count && has_remaining(in)) { - int ret = read_int64(dummy, in); - if (ret != common::E_OK) return ret; - ++skipped; - } - return common::E_OK; - } - - virtual int skip_float(int count, int& skipped, common::ByteStream& in) { - skipped = 0; - float dummy; - while (skipped < count && has_remaining(in)) { - int ret = read_float(dummy, in); - if (ret != common::E_OK) return ret; - ++skipped; - } - return common::E_OK; - } - - virtual int skip_double(int count, int& skipped, common::ByteStream& in) { - skipped = 0; - double dummy; - while (skipped < count && has_remaining(in)) { - int ret = read_double(dummy, in); - if (ret != common::E_OK) return ret; - ++skipped; - } - return common::E_OK; - } }; } // end namespace storage
diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index 1150e40..7688ed1 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -168,7 +168,9 @@ int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock, Filter* oneshoot_filter, PageArena& pa) { - return get_next_page_multi(ret_tsblock, oneshoot_filter, pa); + return get_next_page_multi(ret_tsblock, oneshoot_filter, pa, + std::numeric_limits<int64_t>::min(), nullptr, + nullptr); } int AlignedChunkReader::get_cur_page_header(ChunkMeta*& chunk_meta, @@ -332,7 +334,8 @@ if (row_limit == 0) { return E_NO_MORE_DATA; } - return get_next_page_multi(ret_tsblock, oneshoot_filter, pa); + return get_next_page_multi(ret_tsblock, oneshoot_filter, pa, min_time_hint, + &row_offset, &row_limit); } int AlignedChunkReader::load_by_aligned_meta_multi( @@ -448,15 +451,20 @@ int AlignedChunkReader::get_next_page_multi(TsBlock* ret_tsblock, Filter* oneshoot_filter, - PageArena& pa) { + PageArena& pa, + int64_t min_time_hint, + int* row_offset, int* row_limit) { int ret = E_OK; Filter* filter = (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_); + if (row_limit != nullptr && *row_limit == 0) return E_NO_MORE_DATA; + // Resume chunk-level scatter from previous E_OVERFLOW. if (chunk_level_active_) { RowAppender row_appender(ret_tsblock); - ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa); + ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa, + row_offset, row_limit); if (ret != E_OVERFLOW) { cleanup_chunk_decode(); } else { @@ -482,7 +490,8 @@ chunk_level_active_ = true; RowAppender row_appender(ret_tsblock); - ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa); + ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa, + row_offset, row_limit); if (ret != E_OVERFLOW) { cleanup_chunk_decode(); } else { @@ -493,18 +502,21 @@ #endif // Serial fallback. - return get_next_page_multi_serial(ret_tsblock, filter, pa); + return get_next_page_multi_serial(ret_tsblock, filter, pa, min_time_hint, + row_offset); } int AlignedChunkReader::get_next_page_multi_serial(TsBlock* ret_tsblock, Filter* filter, - PageArena& pa) { + PageArena& pa, + int64_t min_time_hint, + int* row_offset) { int ret = E_OK; bool pt = prev_time_page_not_finish(); bool pv = prev_any_value_page_not_finish_multi(); if (pt && pv) { - ret = - decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa); + ret = decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa, + row_offset, nullptr); return ret; } if (!pt && !pv) { @@ -523,8 +535,25 @@ } } if (IS_FAIL(ret)) break; - if (cur_page_statisify_filter_multi(filter)) break; - if (RET_FAIL(skip_cur_page_multi())) break; + if (!cur_page_statisify_filter_multi(filter)) { + if (RET_FAIL(skip_cur_page_multi())) break; + } else if (min_time_hint != std::numeric_limits<int64_t>::min() && + cur_time_page_header_.statistic_ != nullptr && + cur_time_page_header_.statistic_->end_time_ < + min_time_hint) { + // Skip page whose time range is entirely before hint. + if (RET_FAIL(skip_cur_page_multi())) break; + } else if (row_offset != nullptr && *row_offset > 0 && + cur_time_page_header_.statistic_ != nullptr && + cur_time_page_header_.statistic_->count_ > 0 && + *row_offset >= + cur_time_page_header_.statistic_->count_) { + // Skip entire page by offset. + *row_offset -= cur_time_page_header_.statistic_->count_; + if (RET_FAIL(skip_cur_page_multi())) break; + } else { + break; + } if (!has_more_data()) { ret = E_NO_MORE_DATA; break; @@ -536,8 +565,8 @@ } } if (IS_SUCC(ret)) { - ret = - decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa); + ret = decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa, + row_offset, nullptr); } return ret; } @@ -657,10 +686,12 @@ } int AlignedChunkReader::decode_time_value_buf_into_tsblock_multi( - TsBlock*& ret_tsblock, Filter* filter, PageArena* pa) { + TsBlock*& ret_tsblock, Filter* filter, PageArena* pa, int* row_offset, + int* row_limit) { int ret = E_OK; RowAppender row_appender(ret_tsblock); - ret = multi_DECODE_TV_BATCH(ret_tsblock, row_appender, filter, pa); + ret = multi_decode_tv_row_by_row(ret_tsblock, row_appender, filter, pa, + row_offset, row_limit); // Release uncompressed buffers if pages are done if (ret != E_OVERFLOW) { @@ -689,224 +720,165 @@ return ret; } -int AlignedChunkReader::multi_DECODE_TV_BATCH(TsBlock* ret_tsblock, - RowAppender& row_appender, - Filter* filter, PageArena* pa) { +int AlignedChunkReader::multi_decode_tv_row_by_row( + TsBlock* ret_tsblock, RowAppender& row_appender, Filter* filter, + PageArena* pa, int* row_offset, int* row_limit) { int ret = E_OK; - const int BATCH = 129; - int64_t times[BATCH]; const uint32_t null_mask_base = 1 << 7; const uint32_t num_cols = value_columns_.size(); + int64_t time = 0; + + auto skip_value = [](ValueColumnState* col, common::PageArena* pa) { + switch (col->chunk_header.data_type_) { + case common::BOOLEAN: { + bool d; + col->decoder->read_boolean(d, col->in); + break; + } + case common::INT32: + case common::DATE: { + int32_t d; + col->decoder->read_int32(d, col->in); + break; + } + case common::INT64: + case common::TIMESTAMP: { + int64_t d; + col->decoder->read_int64(d, col->in); + break; + } + case common::FLOAT: { + float d; + col->decoder->read_float(d, col->in); + break; + } + case common::DOUBLE: { + double d; + col->decoder->read_double(d, col->in); + break; + } + case common::STRING: + case common::TEXT: + case common::BLOB: { + common::String d; + col->decoder->read_String(d, *pa, col->in); + break; + } + default: + break; + } + }; + + auto read_and_append_value = [&](ValueColumnState* col, uint32_t slot, + RowAppender& ra, common::PageArena* pa) { + switch (col->chunk_header.data_type_) { + case common::BOOLEAN: { + bool v; + col->decoder->read_boolean(v, col->in); + ra.append(slot, (char*)&v, sizeof(v)); + break; + } + case common::INT32: + case common::DATE: { + int32_t v; + col->decoder->read_int32(v, col->in); + ra.append(slot, (char*)&v, sizeof(v)); + break; + } + case common::INT64: + case common::TIMESTAMP: { + int64_t v; + col->decoder->read_int64(v, col->in); + ra.append(slot, (char*)&v, sizeof(v)); + break; + } + case common::FLOAT: { + float v; + col->decoder->read_float(v, col->in); + ra.append(slot, (char*)&v, sizeof(v)); + break; + } + case common::DOUBLE: { + double v; + col->decoder->read_double(v, col->in); + ra.append(slot, (char*)&v, sizeof(v)); + break; + } + case common::STRING: + case common::TEXT: + case common::BLOB: { + common::String v; + col->decoder->read_String(v, *pa, col->in); + ra.append(slot, v.buf_, v.len_); + break; + } + default: + ra.append_null(slot); + break; + } + }; while (time_decoder_->has_remaining(time_in_)) { - if (row_appender.remaining() < (uint32_t)BATCH) { + if (row_limit != nullptr && *row_limit == 0) break; + + // Check capacity BEFORE consuming timestamp + if (UNLIKELY(!row_appender.add_row())) { ret = E_OVERFLOW; break; } - // ── Phase 1: Decode a batch of timestamps ── - int time_count = 0; - if (RET_FAIL(time_decoder_->read_batch_int64(times, BATCH, time_count, - time_in_))) { + ret = time_decoder_->read_int64(time, time_in_); + if (ret != E_OK) { + row_appender.backoff_add_row(); break; } - if (time_count == 0) break; - // ── Phase 2: Apply time filter ── - bool time_mask[BATCH]; - bool block_all_pass = (filter == nullptr); - int pass_count = time_count; - if (!block_all_pass) { - pass_count = - filter->satisfy_batch_time(times, time_count, time_mask); - } - - // ── Phase 3: Per-column null check + value decode ── - struct ColBatch { - bool is_null[BATCH]; - int nonnull_count; - char val_buf[BATCH * 8]; - int val_count; - }; - std::vector<ColBatch> col_batches(num_cols); - + // Advance value index for all columns for (uint32_t c = 0; c < num_cols; c++) { - auto* col = value_columns_[c]; - auto& cb = col_batches[c]; - cb.nonnull_count = 0; - cb.val_count = 0; - for (int i = 0; i < time_count; i++) { - int vi = col->cur_value_index + 1 + i; - if (col->notnull_bitmap.empty() || - ((col->notnull_bitmap[vi / 8] & 0xFF) & - (null_mask_base >> (vi % 8))) == 0) { - cb.is_null[i] = true; - } else { - cb.is_null[i] = false; - cb.nonnull_count++; - } - } - - // Skip values if no rows pass time filter - if (pass_count == 0 && cb.nonnull_count > 0) { - switch (col->chunk_header.data_type_) { - case common::BOOLEAN: { - for (int s = 0; s < cb.nonnull_count; s++) { - bool dummy; - col->decoder->read_boolean(dummy, col->in); - } - break; - } - case common::INT32: - case common::DATE: { - int sk = 0; - col->decoder->skip_int32(cb.nonnull_count, sk, col->in); - break; - } - case common::INT64: - case common::TIMESTAMP: { - int sk = 0; - col->decoder->skip_int64(cb.nonnull_count, sk, col->in); - break; - } - case common::FLOAT: { - int sk = 0; - col->decoder->skip_float(cb.nonnull_count, sk, col->in); - break; - } - case common::DOUBLE: { - int sk = 0; - col->decoder->skip_double(cb.nonnull_count, sk, - col->in); - break; - } - default: - break; - } - cb.nonnull_count = 0; - } - - // Decode non-null values - if (cb.nonnull_count > 0) { - switch (col->chunk_header.data_type_) { - case common::BOOLEAN: { - bool* out = reinterpret_cast<bool*>(cb.val_buf); - cb.val_count = 0; - for (int s = 0; s < cb.nonnull_count; s++) { - bool v; - if (col->decoder->read_boolean(v, col->in) != - common::E_OK) - break; - out[cb.val_count++] = v; - } - break; - } - case common::INT32: - case common::DATE: - col->decoder->read_batch_int32( - reinterpret_cast<int32_t*>(cb.val_buf), - cb.nonnull_count, cb.val_count, col->in); - break; - case common::INT64: - case common::TIMESTAMP: - col->decoder->read_batch_int64( - reinterpret_cast<int64_t*>(cb.val_buf), - cb.nonnull_count, cb.val_count, col->in); - break; - case common::FLOAT: - col->decoder->read_batch_float( - reinterpret_cast<float*>(cb.val_buf), - cb.nonnull_count, cb.val_count, col->in); - break; - case common::DOUBLE: - col->decoder->read_batch_double( - reinterpret_cast<double*>(cb.val_buf), - cb.nonnull_count, cb.val_count, col->in); - break; - default: - break; - } - } + value_columns_[c]->cur_value_index++; } - // ── Phase 4: Skip if no rows pass ── - if (pass_count == 0) { + // Time filter — skip row + bool skip_row = + (filter != nullptr && !filter->satisfy_start_end_time(time, time)); + + // Offset skip — skip row but count it + if (!skip_row && row_offset != nullptr && *row_offset > 0) { + (*row_offset)--; + skip_row = true; + } + + if (skip_row) { + row_appender.backoff_add_row(); for (uint32_t c = 0; c < num_cols; c++) { - value_columns_[c]->cur_value_index += time_count; + auto* col = value_columns_[c]; + int vi = col->cur_value_index; + bool is_nonnull = !col->notnull_bitmap.empty() && + ((col->notnull_bitmap[vi / 8] & 0xFF) & + (null_mask_base >> (vi % 8))) != 0; + if (is_nonnull && col->decoder->has_remaining(col->in)) { + skip_value(col, pa); + } } continue; } - // ── Phase 5: Scatter into TsBlock ── + row_appender.append(0, (char*)&time, sizeof(time)); - // Fast path: all rows pass filter AND all columns have no nulls - if (pass_count == time_count) { - bool all_nonnull = true; - for (uint32_t c = 0; c < num_cols; c++) { - if (col_batches[c].nonnull_count != time_count) { - all_nonnull = false; - break; - } - } - if (all_nonnull) { - common::Vector* time_vec = ret_tsblock->get_vector(0); - time_vec->get_value_data().append_fixed_value( - (const char*)times, - static_cast<uint32_t>(time_count) * sizeof(int64_t)); - for (uint32_t c = 0; c < num_cols; c++) { - auto& cb = col_batches[c]; - auto* col = value_columns_[c]; - uint32_t elem_size = common::get_data_type_size( - col->chunk_header.data_type_); - common::Vector* vec = ret_tsblock->get_vector(c + 1); - vec->get_value_data().append_fixed_value( - cb.val_buf, - static_cast<uint32_t>(cb.val_count) * elem_size); - col->cur_value_index += time_count; - } - row_appender.add_rows(static_cast<uint32_t>(time_count)); - continue; + for (uint32_t c = 0; c < num_cols; c++) { + auto* col = value_columns_[c]; + int vi = col->cur_value_index; + bool is_nonnull = !col->notnull_bitmap.empty() && + ((col->notnull_bitmap[vi / 8] & 0xFF) & + (null_mask_base >> (vi % 8))) != 0; + + if (!is_nonnull || !col->decoder->has_remaining(col->in)) { + row_appender.append_null(c + 1); + } else { + read_and_append_value(col, c + 1, row_appender, pa); } } - - // Slow path: per-row scatter - std::vector<int> val_idx(num_cols, 0); - - for (int i = 0; i < time_count; i++) { - bool passes = block_all_pass || time_mask[i]; - - if (!passes) { - for (uint32_t c = 0; c < num_cols; c++) { - value_columns_[c]->cur_value_index++; - if (!col_batches[c].is_null[i]) val_idx[c]++; - } - continue; - } - - if (UNLIKELY(!row_appender.add_row())) { - ret = E_OVERFLOW; - break; - } - - row_appender.append(0, (char*)×[i], sizeof(int64_t)); - - for (uint32_t c = 0; c < num_cols; c++) { - value_columns_[c]->cur_value_index++; - auto& cb = col_batches[c]; - auto* col = value_columns_[c]; - - if (cb.is_null[i]) { - row_appender.append_null(c + 1); - } else { - uint32_t elem_size = common::get_data_type_size( - col->chunk_header.data_type_); - row_appender.append( - c + 1, cb.val_buf + val_idx[c] * elem_size, elem_size); - val_idx[c]++; - } - } - } - if (ret != E_OK) break; + if (row_limit != nullptr) (*row_limit)--; } return ret; } @@ -1063,13 +1035,10 @@ ts_in.wrap_from(ub, us); time_decoder_->reset(); td.times.clear(); - const int BS = 1024; - int64_t buf[BS]; while (time_decoder_->has_remaining(ts_in)) { - int actual = 0; - time_decoder_->read_batch_int64(buf, BS, actual, ts_in); - if (actual == 0) break; - td.times.insert(td.times.end(), buf, buf + actual); + int64_t t; + if (time_decoder_->read_int64(t, ts_in) != E_OK) break; + td.times.push_back(t); } td.count = (int)td.times.size(); time_compressor_->after_uncompress(ub); @@ -1161,27 +1130,43 @@ break; } case common::INT32: - case common::DATE: - col->decoder->read_batch_int32( - reinterpret_cast<int32_t*>(cp.values.data()), nn, - cp.nonnull_count, vi); + case common::DATE: { + int32_t* out = reinterpret_cast<int32_t*>(cp.values.data()); + for (int s = 0; s < nn; s++) { + int32_t v; + if (col->decoder->read_int32(v, vi) != E_OK) break; + out[cp.nonnull_count++] = v; + } break; + } case common::INT64: - case common::TIMESTAMP: - col->decoder->read_batch_int64( - reinterpret_cast<int64_t*>(cp.values.data()), nn, - cp.nonnull_count, vi); + case common::TIMESTAMP: { + int64_t* out = reinterpret_cast<int64_t*>(cp.values.data()); + for (int s = 0; s < nn; s++) { + int64_t v; + if (col->decoder->read_int64(v, vi) != E_OK) break; + out[cp.nonnull_count++] = v; + } break; - case common::FLOAT: - col->decoder->read_batch_float( - reinterpret_cast<float*>(cp.values.data()), nn, - cp.nonnull_count, vi); + } + case common::FLOAT: { + float* out = reinterpret_cast<float*>(cp.values.data()); + for (int s = 0; s < nn; s++) { + float v; + if (col->decoder->read_float(v, vi) != E_OK) break; + out[cp.nonnull_count++] = v; + } break; - case common::DOUBLE: - col->decoder->read_batch_double( - reinterpret_cast<double*>(cp.values.data()), nn, - cp.nonnull_count, vi); + } + case common::DOUBLE: { + double* out = reinterpret_cast<double*>(cp.values.data()); + for (int s = 0; s < nn; s++) { + double v; + if (col->decoder->read_double(v, vi) != E_OK) break; + out[cp.nonnull_count++] = v; + } break; + } default: break; } @@ -1207,18 +1192,36 @@ int AlignedChunkReader::scatter_chunk_pages(TsBlock* ret_tsblock, RowAppender& row_appender, - Filter* filter, PageArena* pa) { + Filter* filter, PageArena* pa, + int* row_offset, int* row_limit) { int ret = E_OK; const uint32_t null_mask_base = 1 << 7; const uint32_t num_cols = value_columns_.size(); const size_t np = chunk_pages_.size(); while ((size_t)chunk_page_cursor_ < np) { + if (row_limit != nullptr && *row_limit == 0) break; + auto& td = chunk_times_[chunk_page_cursor_]; if (td.cursor >= td.count) { chunk_page_cursor_++; continue; } + + // Page-level offset skip: skip entire pre-decoded page. + if (row_offset != nullptr && *row_offset > 0 && + *row_offset >= (td.count - td.cursor)) { + *row_offset -= (td.count - td.cursor); + // Advance read_pos for all columns + for (uint32_t c = 0; c < num_cols; c++) { + auto& cp = chunk_cols_[c][chunk_page_cursor_]; + cp.read_pos = cp.nonnull_count; // fully consumed + } + td.cursor = td.count; + chunk_page_cursor_++; + continue; + } + auto& info = chunk_pages_[chunk_page_cursor_]; bool need_filter = (info.pass_type == PagePassType::BOUNDARY); @@ -1239,33 +1242,38 @@ if (can_bulk) { while (td.cursor < td.count) { - int avail = (int)row_appender.remaining(); - if (avail <= 0) return E_OVERFLOW; - int batch = std::min(td.count - td.cursor, avail); - - ret_tsblock->get_vector(0)->get_value_data().append_fixed_value( - (const char*)&td.times[td.cursor], - static_cast<uint32_t>(batch) * sizeof(int64_t)); + if (row_limit != nullptr && *row_limit == 0) break; + // Row-level offset skip + if (row_offset != nullptr && *row_offset > 0) { + (*row_offset)--; + for (uint32_t c = 0; c < num_cols; c++) + chunk_cols_[c][chunk_page_cursor_].read_pos++; + td.cursor++; + continue; + } + if (UNLIKELY(!row_appender.add_row())) return E_OVERFLOW; + int64_t t = td.times[td.cursor]; + row_appender.append(0, (char*)&t, sizeof(int64_t)); for (uint32_t c = 0; c < num_cols; c++) { auto& cp = chunk_cols_[c][chunk_page_cursor_]; uint32_t es = common::get_data_type_size( value_columns_[c]->chunk_header.data_type_); - ret_tsblock->get_vector(c + 1) - ->get_value_data() - .append_fixed_value( - cp.values.data() + - static_cast<size_t>(cp.read_pos) * es, - static_cast<uint32_t>(batch) * es); - cp.read_pos += batch; + row_appender.append( + c + 1, + cp.values.data() + + static_cast<size_t>(cp.read_pos) * es, + es); + cp.read_pos++; } - row_appender.add_rows(static_cast<uint32_t>(batch)); - td.cursor += batch; + td.cursor++; + if (row_limit != nullptr) (*row_limit)--; } } else { while (td.cursor < td.count) { - if (row_appender.remaining() == 0) return E_OVERFLOW; + if (row_limit != nullptr && *row_limit == 0) break; int64_t t = td.times[td.cursor]; + // Filter skip if (need_filter && filter != nullptr && !filter->satisfy_start_end_time(t, t)) { for (uint32_t c = 0; c < num_cols; c++) { @@ -1281,6 +1289,22 @@ continue; } + // Offset skip + if (row_offset != nullptr && *row_offset > 0) { + (*row_offset)--; + for (uint32_t c = 0; c < num_cols; c++) { + auto& cp = chunk_cols_[c][chunk_page_cursor_]; + if (cp.data_num > 0 && !cp.bitmap.empty()) { + int vi = td.cursor; + if ((cp.bitmap[vi / 8] & 0xFF) & + (null_mask_base >> (vi % 8))) + cp.read_pos++; + } + } + td.cursor++; + continue; + } + if (UNLIKELY(!row_appender.add_row())) return E_OVERFLOW; row_appender.append(0, (char*)&t, sizeof(int64_t)); @@ -1306,6 +1330,7 @@ } } td.cursor++; + if (row_limit != nullptr) (*row_limit)--; } } chunk_page_cursor_++;
diff --git a/cpp/src/reader/aligned_chunk_reader.h b/cpp/src/reader/aligned_chunk_reader.h index 9b74c5e..9b6f474 100644 --- a/cpp/src/reader/aligned_chunk_reader.h +++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -81,11 +81,6 @@ PageHeader cur_page_header; std::vector<uint8_t> notnull_bitmap; int32_t cur_value_index = -1; - - std::vector<char> predecoded_values; - int predecoded_count = 0; - int predecoded_read_pos = 0; - bool predecoded = false; }; class AlignedChunkReader : public IChunkReader { @@ -173,29 +168,34 @@ bool has_more_data_multi() const; bool prev_any_value_page_not_finish_multi() const; int get_next_page_multi(common::TsBlock* ret_tsblock, - Filter* oneshoot_filter, common::PageArena& pa); + Filter* oneshoot_filter, common::PageArena& pa, + int64_t min_time_hint, int* row_offset, + int* row_limit); int get_next_page_multi_serial(common::TsBlock* ret_tsblock, Filter* filter, - common::PageArena& pa); + common::PageArena& pa, int64_t min_time_hint, + int* row_offset); int skip_cur_page_multi(); bool cur_page_statisify_filter_multi(Filter* filter); int decode_cur_value_pages_multi(); - int decode_cur_value_page_data_for(ValueColumnState& col); int ensure_value_page_loaded(ValueColumnState& col); static int decompress_and_parse_value_page(ValueColumnState& col); - void predecode_all_timestamps(); int decode_time_value_buf_into_tsblock_multi(common::TsBlock*& ret_tsblock, Filter* filter, - common::PageArena* pa); - int multi_DECODE_TV_BATCH(common::TsBlock* ret_tsblock, - common::RowAppender& row_appender, Filter* filter, - common::PageArena* pa); + common::PageArena* pa, + int* row_offset, + int* row_limit); + int multi_decode_tv_row_by_row(common::TsBlock* ret_tsblock, + common::RowAppender& row_appender, + Filter* filter, common::PageArena* pa, + int* row_offset, int* row_limit); // ── Chunk-level parallel decode methods ───────────────────────────── int scan_chunk_pages(Filter* filter); int decode_chunk_pages(); int scatter_chunk_pages(common::TsBlock* tsblock, common::RowAppender& row_appender, Filter* filter, - common::PageArena* pa); + common::PageArena* pa, int* row_offset, + int* row_limit); void cleanup_chunk_decode(); private:
diff --git a/cpp/src/reader/filter/filter.h b/cpp/src/reader/filter/filter.h index 1146e46..f39dddb 100644 --- a/cpp/src/reader/filter/filter.h +++ b/cpp/src/reader/filter/filter.h
@@ -63,19 +63,6 @@ ASSERT(false); return nullptr; } - - // Batch time filter: evaluate time filter on an array of timestamps. - // Writes true/false into @mask for each element. - // Returns the number of elements that passed (mask[i] == true). - virtual int satisfy_batch_time(const int64_t* times, int count, - bool* mask) { - int pass = 0; - for (int i = 0; i < count; ++i) { - mask[i] = satisfy_start_end_time(times[i], times[i]); - if (mask[i]) ++pass; - } - return pass; - } }; } // namespace storage