blob: 7688ed10d6d3838d2754d8bada1aa2abca6ed0aa [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "aligned_chunk_reader.h"
#include <algorithm>
#include <limits>
#include "common/global.h"
#ifdef ENABLE_THREADS
#include "common/thread_pool.h"
#endif
#include "compress/compressor_factory.h"
#include "encoding/decoder_factory.h"
using namespace common;
namespace storage {
int AlignedChunkReader::init(ReadFile* read_file, String m_name,
TSDataType data_type, Filter* time_filter) {
read_file_ = read_file;
measurement_name_.shallow_copy_from(m_name);
time_decoder_ = DecoderFactory::alloc_time_decoder();
time_compressor_ = nullptr;
time_filter_ = time_filter;
time_uncompressed_buf_ = nullptr;
if (IS_NULL(time_decoder_)) {
return E_OOM;
}
return E_OK;
}
void AlignedChunkReader::reset() {
time_chunk_meta_ = nullptr;
time_chunk_header_.reset();
cur_time_page_header_.reset();
char* file_data_buf = time_in_stream_.get_wrapped_buf();
if (file_data_buf != nullptr) {
mem_free(file_data_buf);
}
time_in_stream_.clear_wrapped_buf();
time_in_stream_.reset();
file_data_time_buf_size_ = 0;
time_chunk_visit_offset_ = 0;
// Free leftover uncompressed buffers from the previous chunk.
if (time_uncompressed_buf_ != nullptr && time_compressor_ != nullptr) {
time_compressor_->after_uncompress(time_uncompressed_buf_);
time_uncompressed_buf_ = nullptr;
}
for (auto* col : value_columns_) {
if (col->uncompressed_buf != nullptr && col->compressor != nullptr) {
col->compressor->after_uncompress(col->uncompressed_buf);
col->uncompressed_buf = nullptr;
}
char* buf = col->in_stream.get_wrapped_buf();
if (buf != nullptr) mem_free(buf);
col->in_stream.clear_wrapped_buf();
col->in_stream.reset();
col->in.reset();
col->chunk_header.reset();
col->cur_page_header.reset();
col->file_data_buf_size = 0;
col->chunk_visit_offset = 0;
col->notnull_bitmap.clear();
col->cur_value_index = -1;
col->chunk_meta = nullptr;
}
cleanup_chunk_decode();
}
void AlignedChunkReader::destroy() {
cleanup_chunk_decode();
if (time_uncompressed_buf_ != nullptr && time_compressor_ != nullptr) {
time_compressor_->after_uncompress(time_uncompressed_buf_);
time_uncompressed_buf_ = nullptr;
}
if (time_decoder_ != nullptr) {
time_decoder_->~Decoder();
DecoderFactory::free(time_decoder_);
time_decoder_ = nullptr;
}
if (time_compressor_ != nullptr) {
time_compressor_->~Compressor();
CompressorFactory::free(time_compressor_);
time_compressor_ = nullptr;
}
char* buf = time_in_stream_.get_wrapped_buf();
if (buf != nullptr) {
mem_free(buf);
time_in_stream_.clear_wrapped_buf();
}
cur_time_page_header_.reset();
chunk_header_.~ChunkHeader();
for (size_t ci = 0; ci < value_columns_.size(); ci++) {
auto* col = value_columns_[ci];
if (col->decoder != nullptr) {
col->decoder->~Decoder();
DecoderFactory::free(col->decoder);
col->decoder = nullptr;
}
if (col->compressor != nullptr) {
col->compressor->~Compressor();
CompressorFactory::free(col->compressor);
col->compressor = nullptr;
}
buf = col->in_stream.get_wrapped_buf();
if (buf != nullptr) {
mem_free(buf);
col->in_stream.clear_wrapped_buf();
}
col->cur_page_header.reset();
delete col;
}
value_columns_.clear();
#ifdef ENABLE_THREADS
decode_pool_ = nullptr; // borrowed, not owned
#endif
}
int AlignedChunkReader::load_by_aligned_meta(ChunkMeta* time_chunk_meta,
ChunkMeta* value_chunk_meta) {
std::vector<ChunkMeta*> value_metas = {value_chunk_meta};
return load_by_aligned_meta_multi(time_chunk_meta, value_metas);
}
int AlignedChunkReader::alloc_compressor_and_decoder(
storage::Decoder*& decoder, storage::Compressor*& compressor,
TSEncoding encoding, TSDataType data_type, CompressionType compression) {
if (decoder != nullptr) {
decoder->reset();
} else {
decoder = DecoderFactory::alloc_value_decoder(encoding, data_type);
if (IS_NULL(decoder)) {
return E_OOM;
}
}
if (compressor != nullptr) {
compressor->reset(false);
} else {
compressor = CompressorFactory::alloc_compressor(compression);
if (compressor == nullptr) {
return E_OOM;
}
}
return E_OK;
}
int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock,
Filter* oneshoot_filter, PageArena& pa) {
return get_next_page_multi(ret_tsblock, oneshoot_filter, pa,
std::numeric_limits<int64_t>::min(), nullptr,
nullptr);
}
int AlignedChunkReader::get_cur_page_header(ChunkMeta*& chunk_meta,
common::ByteStream& in_stream,
PageHeader& cur_page_header,
uint32_t& chunk_visit_offset,
ChunkHeader& chunk_header,
int32_t* override_buf_size) {
int ret = E_OK;
bool retry = true;
int cur_page_header_serialized_size = 0;
// TODO: configurable
int retry_read_want_size = 1024;
if (chunk_visit_offset - chunk_header.serialized_size_ >=
chunk_header.data_size_) {
cur_page_header.reset();
return E_OK;
}
do {
in_stream.mark_read_pos();
cur_page_header.reset();
ret = cur_page_header.deserialize_from(
in_stream, !chunk_has_only_one_page(chunk_header),
chunk_header.data_type_);
cur_page_header_serialized_size = in_stream.get_mark_len();
if (deserialize_buf_not_enough(ret) && retry) {
retry = false;
retry_read_want_size += 1024;
int32_t& file_data_buf_size = override_buf_size != nullptr
? *override_buf_size
: file_data_time_buf_size_;
// do not shrink buffer for page header, otherwise, the buffer is
// most likely to grow back when reading page data
if (E_OK == read_from_file_and_rewrap(
in_stream, chunk_meta, chunk_visit_offset,
file_data_buf_size, retry_read_want_size, false)) {
continue;
}
}
break;
} while (true);
if (IS_SUCC(ret)) {
// visit a header
chunk_visit_offset += cur_page_header_serialized_size;
}
#if DEBUG_SE
std::cout << "get_cur_page_header, ret=" << ret << ", retry=" << retry
<< ", cur_page_header=" << cur_page_header
<< ", chunk_meta->offset_of_chunk_header_="
<< chunk_meta->offset_of_chunk_header_
<< ", cur_page_header_serialized_size="
<< cur_page_header_serialized_size << std::endl;
#endif
return ret;
}
// reader at least @want_size bytes from file and wrap the buffer into
// @in_stream_
int AlignedChunkReader::read_from_file_and_rewrap(
common::ByteStream& in_stream_, ChunkMeta*& chunk_meta,
uint32_t& chunk_visit_offset, int32_t& file_data_buf_size, int want_size,
bool may_shrink) {
int ret = E_OK;
const int DEFAULT_READ_SIZE = 4096; // may use page_size + page_header_size
char* file_data_buf = in_stream_.get_wrapped_buf();
int64_t offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int read_size =
(want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
if (file_data_buf_size < read_size ||
(may_shrink && read_size < file_data_buf_size / 10)) {
file_data_buf = (char*)mem_realloc(file_data_buf, read_size);
if (IS_NULL(file_data_buf)) {
in_stream_.clear_wrapped_buf();
return E_OOM;
}
file_data_buf_size = read_size;
in_stream_.wrap_from(file_data_buf, read_size);
}
int ret_read_len = 0;
if (RET_FAIL(
read_file_->read(offset, file_data_buf, read_size, ret_read_len))) {
} else {
in_stream_.wrap_from(file_data_buf, ret_read_len);
#ifdef DEBUG_SE
std::cout << "file offset = " << offset << " len = " << ret_read_len
<< std::endl;
DEBUG_hex_dump_buf("wrapped buf = ", file_data_buf, 256);
#endif
}
return ret;
}
int AlignedChunkReader::decode_cur_time_page_data() {
int ret = E_OK;
// Step 1: make sure we load the whole page data in @in_stream_
if (time_in_stream_.remaining_size() <
cur_time_page_header_.compressed_size_) {
// std::cout << "decode_cur_page_data. in_stream_.remaining_size="<<
// in_stream_.remaining_size() << ", cur_page_header_.compressed_size_="
// << cur_page_header_.compressed_size_ << std::endl;
if (RET_FAIL(read_from_file_and_rewrap(
time_in_stream_, time_chunk_meta_, time_chunk_visit_offset_,
file_data_time_buf_size_,
cur_time_page_header_.compressed_size_))) {
}
}
char* time_compressed_buf = nullptr;
char* time_uncompressed_buf = nullptr;
uint32_t time_compressed_buf_size = 0;
uint32_t time_uncompressed_buf_size = 0;
// Step 2: do uncompress
if (IS_SUCC(ret)) {
time_compressed_buf =
time_in_stream_.get_wrapped_buf() + time_in_stream_.read_pos();
#ifdef DEBUG_SE
std::cout << "AlignedChunkReader::decode_cur_page_data,time_in_stream_."
"get_wrapped_buf="
<< (void*)(time_in_stream_.get_wrapped_buf())
<< ", time_in_stream_.read_pos=" << time_in_stream_.read_pos()
<< std::endl;
#endif
time_compressed_buf_size = cur_time_page_header_.compressed_size_;
time_in_stream_.wrapped_buf_advance_read_pos(time_compressed_buf_size);
time_chunk_visit_offset_ += time_compressed_buf_size;
if (RET_FAIL(time_compressor_->reset(false))) {
} else if (RET_FAIL(time_compressor_->uncompress(
time_compressed_buf, time_compressed_buf_size,
time_uncompressed_buf, time_uncompressed_buf_size))) {
} else {
time_uncompressed_buf_ = time_uncompressed_buf;
}
#ifdef DEBUG_SE
DEBUG_hex_dump_buf(
"AlignedChunkReader reader, time_uncompressed buf = ",
time_uncompressed_buf, time_uncompressed_buf_size);
#endif
if (ret != E_OK || time_uncompressed_buf_size !=
cur_time_page_header_.uncompressed_size_) {
ret = E_TSFILE_CORRUPTED;
ASSERT(false);
}
}
time_decoder_->reset();
#ifdef DEBUG_SE
DEBUG_hex_dump_buf("AlignedChunkReader reader, time_buf = ", time_buf,
time_buf_size);
#endif
time_in_.wrap_from(time_uncompressed_buf_, time_uncompressed_buf_size);
return ret;
}
int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock,
Filter* oneshoot_filter, PageArena& pa,
int64_t min_time_hint, int& row_offset,
int& row_limit) {
if (row_limit == 0) {
return E_NO_MORE_DATA;
}
return get_next_page_multi(ret_tsblock, oneshoot_filter, pa, min_time_hint,
&row_offset, &row_limit);
}
int AlignedChunkReader::load_by_aligned_meta_multi(
ChunkMeta* time_chunk_meta, const std::vector<ChunkMeta*>& value_metas) {
int ret = E_OK;
time_chunk_meta_ = time_chunk_meta;
// ── Load time chunk header ──
file_data_time_buf_size_ = 1024;
int32_t ret_read_len = 0;
char* time_file_data_buf =
(char*)mem_alloc(file_data_time_buf_size_, MOD_CHUNK_READER);
if (IS_NULL(time_file_data_buf)) return E_OOM;
ret = read_file_->read(time_chunk_meta_->offset_of_chunk_header_,
time_file_data_buf, file_data_time_buf_size_,
ret_read_len);
if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
ret = E_TSFILE_CORRUPTED;
mem_free(time_file_data_buf);
return ret;
}
if (IS_SUCC(ret)) {
time_in_stream_.wrap_from(time_file_data_buf, ret_read_len);
if (RET_FAIL(time_chunk_header_.deserialize_from(time_in_stream_))) {
return ret;
}
time_chunk_visit_offset_ = time_in_stream_.read_pos();
}
// Alloc time decoder/compressor
if (IS_SUCC(ret)) {
if (RET_FAIL(alloc_compressor_and_decoder(
time_decoder_, time_compressor_,
time_chunk_header_.encoding_type_,
time_chunk_header_.data_type_,
time_chunk_header_.compression_type_))) {
return ret;
}
}
// ── Load each value column ──
if (value_columns_.size() != value_metas.size()) {
for (auto* p : value_columns_) delete p;
value_columns_.clear();
value_columns_.reserve(value_metas.size());
for (size_t c = 0; c < value_metas.size(); c++) {
value_columns_.push_back(new ValueColumnState);
}
}
for (size_t c = 0; c < value_metas.size() && IS_SUCC(ret); c++) {
auto* col = value_columns_[c];
col->chunk_meta = value_metas[c];
col->file_data_buf_size = 1024;
ret_read_len = 0;
char* vbuf =
(char*)mem_alloc(col->file_data_buf_size, MOD_CHUNK_READER);
if (IS_NULL(vbuf)) return E_OOM;
ret = read_file_->read(col->chunk_meta->offset_of_chunk_header_, vbuf,
col->file_data_buf_size, ret_read_len);
if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
ret = E_TSFILE_CORRUPTED;
mem_free(vbuf);
break;
}
if (IS_SUCC(ret)) {
col->in_stream.wrap_from(vbuf, ret_read_len);
if (RET_FAIL(col->chunk_header.deserialize_from(col->in_stream))) {
break;
}
col->chunk_visit_offset = col->in_stream.read_pos();
if (RET_FAIL(alloc_compressor_and_decoder(
col->decoder, col->compressor,
col->chunk_header.encoding_type_,
col->chunk_header.data_type_,
col->chunk_header.compression_type_))) {
break;
}
}
}
return ret;
}
bool AlignedChunkReader::has_more_data_multi() const {
if (chunk_level_active_) return true;
if (prev_time_page_not_finish() || prev_any_value_page_not_finish_multi()) {
return true;
}
if (time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ <
time_chunk_header_.data_size_) {
return true;
}
for (const auto* col : value_columns_) {
if (col->chunk_visit_offset - col->chunk_header.serialized_size_ <
col->chunk_header.data_size_) {
return true;
}
}
return false;
}
bool AlignedChunkReader::prev_any_value_page_not_finish_multi() const {
for (const auto* col : value_columns_) {
if ((col->decoder && col->decoder->has_remaining(col->in)) ||
col->in.has_remaining()) {
return true;
}
}
return false;
}
int AlignedChunkReader::get_next_page_multi(TsBlock* ret_tsblock,
Filter* oneshoot_filter,
PageArena& pa,
int64_t min_time_hint,
int* row_offset, int* row_limit) {
int ret = E_OK;
Filter* filter =
(oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);
if (row_limit != nullptr && *row_limit == 0) return E_NO_MORE_DATA;
// Resume chunk-level scatter from previous E_OVERFLOW.
if (chunk_level_active_) {
RowAppender row_appender(ret_tsblock);
ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa,
row_offset, row_limit);
if (ret != E_OVERFLOW) {
cleanup_chunk_decode();
} else {
ret = E_OK;
}
return ret;
}
#ifdef ENABLE_THREADS
// Chunk-level parallel path for multi-page compressed chunks.
if (decode_pool_ != nullptr && value_columns_.size() > 1 &&
!chunk_has_only_one_page(time_chunk_header_) &&
time_chunk_header_.compression_type_ != common::UNCOMPRESSED) {
ret = scan_chunk_pages(filter);
if (IS_FAIL(ret)) return ret;
if (chunk_pages_.empty()) return E_NO_MORE_DATA;
ret = decode_chunk_pages();
if (IS_FAIL(ret)) {
cleanup_chunk_decode();
return ret;
}
chunk_level_active_ = true;
RowAppender row_appender(ret_tsblock);
ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa,
row_offset, row_limit);
if (ret != E_OVERFLOW) {
cleanup_chunk_decode();
} else {
ret = E_OK;
}
return ret;
}
#endif
// Serial fallback.
return get_next_page_multi_serial(ret_tsblock, filter, pa, min_time_hint,
row_offset);
}
int AlignedChunkReader::get_next_page_multi_serial(TsBlock* ret_tsblock,
Filter* filter,
PageArena& pa,
int64_t min_time_hint,
int* row_offset) {
int ret = E_OK;
bool pt = prev_time_page_not_finish();
bool pv = prev_any_value_page_not_finish_multi();
if (pt && pv) {
ret = decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa,
row_offset, nullptr);
return ret;
}
if (!pt && !pv) {
while (IS_SUCC(ret)) {
if (RET_FAIL(get_cur_page_header(
time_chunk_meta_, time_in_stream_, cur_time_page_header_,
time_chunk_visit_offset_, time_chunk_header_))) {
break;
}
for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); c++) {
auto* col = value_columns_[c];
if (RET_FAIL(get_cur_page_header(
col->chunk_meta, col->in_stream, col->cur_page_header,
col->chunk_visit_offset, col->chunk_header,
&col->file_data_buf_size))) {
}
}
if (IS_FAIL(ret)) break;
if (!cur_page_statisify_filter_multi(filter)) {
if (RET_FAIL(skip_cur_page_multi())) break;
} else if (min_time_hint != std::numeric_limits<int64_t>::min() &&
cur_time_page_header_.statistic_ != nullptr &&
cur_time_page_header_.statistic_->end_time_ <
min_time_hint) {
// Skip page whose time range is entirely before hint.
if (RET_FAIL(skip_cur_page_multi())) break;
} else if (row_offset != nullptr && *row_offset > 0 &&
cur_time_page_header_.statistic_ != nullptr &&
cur_time_page_header_.statistic_->count_ > 0 &&
*row_offset >=
cur_time_page_header_.statistic_->count_) {
// Skip entire page by offset.
*row_offset -= cur_time_page_header_.statistic_->count_;
if (RET_FAIL(skip_cur_page_multi())) break;
} else {
break;
}
if (!has_more_data()) {
ret = E_NO_MORE_DATA;
break;
}
}
if (IS_SUCC(ret)) {
ret = decode_cur_time_page_data();
if (IS_SUCC(ret)) ret = decode_cur_value_pages_multi();
}
}
if (IS_SUCC(ret)) {
ret = decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa,
row_offset, nullptr);
}
return ret;
}
bool AlignedChunkReader::cur_page_statisify_filter_multi(Filter* filter) {
bool time_satisfy = filter == nullptr ||
cur_time_page_header_.statistic_ == nullptr ||
filter->satisfy(cur_time_page_header_.statistic_);
return time_satisfy;
}
int AlignedChunkReader::skip_cur_page_multi() {
time_chunk_visit_offset_ += cur_time_page_header_.compressed_size_;
time_in_stream_.wrapped_buf_advance_read_pos(
cur_time_page_header_.compressed_size_);
for (auto* col : value_columns_) {
col->chunk_visit_offset += col->cur_page_header.compressed_size_;
col->in_stream.wrapped_buf_advance_read_pos(
col->cur_page_header.compressed_size_);
}
return E_OK;
}
int AlignedChunkReader::decode_cur_value_pages_multi() {
int ret = E_OK;
// Phase 1: Serial IO — ensure each column's page data is in memory.
for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); c++) {
ret = ensure_value_page_loaded(*value_columns_[c]);
}
if (IS_FAIL(ret)) return ret;
// Phase 2: Parallel CPU — decompress + parse bitmap + reset decoder.
#ifdef ENABLE_THREADS
if (value_columns_.size() > 1 && decode_pool_ != nullptr) {
std::vector<int> col_rets(value_columns_.size(), E_OK);
for (size_t c = 0; c < value_columns_.size(); c++) {
auto* col = value_columns_[c];
int* col_ret = &col_rets[c];
decode_pool_->submit([col, col_ret] {
*col_ret = decompress_and_parse_value_page(*col);
});
}
decode_pool_->wait_all();
for (size_t c = 0; c < col_rets.size(); c++) {
if (IS_FAIL(col_rets[c])) return col_rets[c];
}
} else
#endif
{
for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); c++) {
ret = decompress_and_parse_value_page(*value_columns_[c]);
}
}
return ret;
}
int AlignedChunkReader::ensure_value_page_loaded(ValueColumnState& col) {
int ret = E_OK;
if (col.in_stream.remaining_size() < col.cur_page_header.compressed_size_) {
if (RET_FAIL(read_from_file_and_rewrap(
col.in_stream, col.chunk_meta, col.chunk_visit_offset,
col.file_data_buf_size,
col.cur_page_header.compressed_size_))) {
return ret;
}
}
return ret;
}
int AlignedChunkReader::decompress_and_parse_value_page(ValueColumnState& col) {
int ret = E_OK;
if (col.cur_page_header.compressed_size_ == 0) {
col.in.wrap_from(nullptr, 0);
return E_OK;
}
// Decompress
char* compressed_buf =
col.in_stream.get_wrapped_buf() + col.in_stream.read_pos();
uint32_t compressed_size = col.cur_page_header.compressed_size_;
col.in_stream.wrapped_buf_advance_read_pos(compressed_size);
col.chunk_visit_offset += compressed_size;
char* uncompressed_buf = nullptr;
uint32_t uncompressed_size = 0;
if (RET_FAIL(col.compressor->reset(false))) {
return ret;
}
if (RET_FAIL(col.compressor->uncompress(compressed_buf, compressed_size,
uncompressed_buf,
uncompressed_size))) {
return ret;
}
col.uncompressed_buf = uncompressed_buf;
if (uncompressed_size != col.cur_page_header.uncompressed_size_) {
return E_TSFILE_CORRUPTED;
}
// Parse bitmap + value data
uint32_t offset = 0;
uint32_t data_num = SerializationUtil::read_ui32(uncompressed_buf);
offset += sizeof(uint32_t);
col.notnull_bitmap.resize((data_num + 7) / 8);
for (size_t i = 0; i < col.notnull_bitmap.size(); i++) {
col.notnull_bitmap[i] = *(uncompressed_buf + offset);
offset++;
}
col.cur_value_index = -1;
char* value_buf = uncompressed_buf + offset;
uint32_t value_buf_size = uncompressed_size - offset;
col.decoder->reset();
col.in.wrap_from(value_buf, value_buf_size);
return ret;
}
int AlignedChunkReader::decode_time_value_buf_into_tsblock_multi(
TsBlock*& ret_tsblock, Filter* filter, PageArena* pa, int* row_offset,
int* row_limit) {
int ret = E_OK;
RowAppender row_appender(ret_tsblock);
ret = multi_decode_tv_row_by_row(ret_tsblock, row_appender, filter, pa,
row_offset, row_limit);
// Release uncompressed buffers if pages are done
if (ret != E_OVERFLOW) {
if (time_uncompressed_buf_ != nullptr) {
time_compressor_->after_uncompress(time_uncompressed_buf_);
time_uncompressed_buf_ = nullptr;
}
for (auto* col : value_columns_) {
if (col->uncompressed_buf != nullptr) {
col->compressor->after_uncompress(col->uncompressed_buf);
col->uncompressed_buf = nullptr;
}
if (!(col->decoder && col->decoder->has_remaining(col->in)) &&
!col->in.has_remaining()) {
col->in.reset();
}
col->notnull_bitmap.clear();
col->notnull_bitmap.shrink_to_fit();
}
if (!prev_time_page_not_finish()) {
time_in_.reset();
}
} else {
ret = E_OK;
}
return ret;
}
int AlignedChunkReader::multi_decode_tv_row_by_row(
TsBlock* ret_tsblock, RowAppender& row_appender, Filter* filter,
PageArena* pa, int* row_offset, int* row_limit) {
int ret = E_OK;
const uint32_t null_mask_base = 1 << 7;
const uint32_t num_cols = value_columns_.size();
int64_t time = 0;
auto skip_value = [](ValueColumnState* col, common::PageArena* pa) {
switch (col->chunk_header.data_type_) {
case common::BOOLEAN: {
bool d;
col->decoder->read_boolean(d, col->in);
break;
}
case common::INT32:
case common::DATE: {
int32_t d;
col->decoder->read_int32(d, col->in);
break;
}
case common::INT64:
case common::TIMESTAMP: {
int64_t d;
col->decoder->read_int64(d, col->in);
break;
}
case common::FLOAT: {
float d;
col->decoder->read_float(d, col->in);
break;
}
case common::DOUBLE: {
double d;
col->decoder->read_double(d, col->in);
break;
}
case common::STRING:
case common::TEXT:
case common::BLOB: {
common::String d;
col->decoder->read_String(d, *pa, col->in);
break;
}
default:
break;
}
};
auto read_and_append_value = [&](ValueColumnState* col, uint32_t slot,
RowAppender& ra, common::PageArena* pa) {
switch (col->chunk_header.data_type_) {
case common::BOOLEAN: {
bool v;
col->decoder->read_boolean(v, col->in);
ra.append(slot, (char*)&v, sizeof(v));
break;
}
case common::INT32:
case common::DATE: {
int32_t v;
col->decoder->read_int32(v, col->in);
ra.append(slot, (char*)&v, sizeof(v));
break;
}
case common::INT64:
case common::TIMESTAMP: {
int64_t v;
col->decoder->read_int64(v, col->in);
ra.append(slot, (char*)&v, sizeof(v));
break;
}
case common::FLOAT: {
float v;
col->decoder->read_float(v, col->in);
ra.append(slot, (char*)&v, sizeof(v));
break;
}
case common::DOUBLE: {
double v;
col->decoder->read_double(v, col->in);
ra.append(slot, (char*)&v, sizeof(v));
break;
}
case common::STRING:
case common::TEXT:
case common::BLOB: {
common::String v;
col->decoder->read_String(v, *pa, col->in);
ra.append(slot, v.buf_, v.len_);
break;
}
default:
ra.append_null(slot);
break;
}
};
while (time_decoder_->has_remaining(time_in_)) {
if (row_limit != nullptr && *row_limit == 0) break;
// Check capacity BEFORE consuming timestamp
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
break;
}
ret = time_decoder_->read_int64(time, time_in_);
if (ret != E_OK) {
row_appender.backoff_add_row();
break;
}
// Advance value index for all columns
for (uint32_t c = 0; c < num_cols; c++) {
value_columns_[c]->cur_value_index++;
}
// Time filter — skip row
bool skip_row =
(filter != nullptr && !filter->satisfy_start_end_time(time, time));
// Offset skip — skip row but count it
if (!skip_row && row_offset != nullptr && *row_offset > 0) {
(*row_offset)--;
skip_row = true;
}
if (skip_row) {
row_appender.backoff_add_row();
for (uint32_t c = 0; c < num_cols; c++) {
auto* col = value_columns_[c];
int vi = col->cur_value_index;
bool is_nonnull = !col->notnull_bitmap.empty() &&
((col->notnull_bitmap[vi / 8] & 0xFF) &
(null_mask_base >> (vi % 8))) != 0;
if (is_nonnull && col->decoder->has_remaining(col->in)) {
skip_value(col, pa);
}
}
continue;
}
row_appender.append(0, (char*)&time, sizeof(time));
for (uint32_t c = 0; c < num_cols; c++) {
auto* col = value_columns_[c];
int vi = col->cur_value_index;
bool is_nonnull = !col->notnull_bitmap.empty() &&
((col->notnull_bitmap[vi / 8] & 0xFF) &
(null_mask_base >> (vi % 8))) != 0;
if (!is_nonnull || !col->decoder->has_remaining(col->in)) {
row_appender.append_null(c + 1);
} else {
read_and_append_value(col, c + 1, row_appender, pa);
}
}
if (row_limit != nullptr) (*row_limit)--;
}
return ret;
}
// ═══════════════════════════════════════════════════════════════════════════
// Chunk-level parallel decode
// ═══════════════════════════════════════════════════════════════════════════
void AlignedChunkReader::cleanup_chunk_decode() {
for (size_t c = 0; c < chunk_cols_.size(); c++) {
for (auto& cp : chunk_cols_[c]) {
if (cp.uncompressed_buf) {
common::mem_free(cp.uncompressed_buf);
cp.uncompressed_buf = nullptr;
}
}
}
chunk_pages_.clear();
chunk_times_.clear();
chunk_cols_.clear();
chunk_page_cursor_ = 0;
chunk_level_active_ = false;
}
int AlignedChunkReader::scan_chunk_pages(Filter* filter) {
int ret = E_OK;
const uint32_t num_cols = value_columns_.size();
chunk_pages_.clear();
while (IS_SUCC(ret)) {
if (time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ >=
time_chunk_header_.data_size_)
break;
if (RET_FAIL(get_cur_page_header(
time_chunk_meta_, time_in_stream_, cur_time_page_header_,
time_chunk_visit_offset_, time_chunk_header_)))
break;
if (cur_time_page_header_.compressed_size_ == 0 &&
cur_time_page_header_.uncompressed_size_ == 0)
break;
for (size_t c = 0; c < num_cols && IS_SUCC(ret); c++) {
auto* col = value_columns_[c];
if (RET_FAIL(get_cur_page_header(
col->chunk_meta, col->in_stream, col->cur_page_header,
col->chunk_visit_offset, col->chunk_header,
&col->file_data_buf_size))) {
}
}
if (IS_FAIL(ret)) break;
Statistic* stat = cur_time_page_header_.statistic_;
PagePassType pt;
if (filter == nullptr || stat == nullptr) {
pt = PagePassType::FULL_PASS;
} else if (!filter->satisfy(stat)) {
pt = PagePassType::SKIP;
} else if (filter->contain_start_end_time(stat->start_time_,
stat->end_time_)) {
pt = PagePassType::FULL_PASS;
} else {
pt = PagePassType::BOUNDARY;
}
if (pt != PagePassType::SKIP) {
ChunkPageInfo info;
info.pass_type = pt;
info.time_file_offset = time_chunk_meta_->offset_of_chunk_header_ +
time_chunk_visit_offset_;
info.time_compressed_size = cur_time_page_header_.compressed_size_;
info.time_uncompressed_size =
cur_time_page_header_.uncompressed_size_;
info.value_file_offsets.resize(num_cols);
info.value_compressed_sizes.resize(num_cols);
info.value_uncompressed_sizes.resize(num_cols);
for (size_t c = 0; c < num_cols; c++) {
auto* col = value_columns_[c];
info.value_file_offsets[c] =
col->chunk_meta->offset_of_chunk_header_ +
col->chunk_visit_offset;
info.value_compressed_sizes[c] =
col->cur_page_header.compressed_size_;
info.value_uncompressed_sizes[c] =
col->cur_page_header.uncompressed_size_;
}
chunk_pages_.push_back(std::move(info));
}
time_chunk_visit_offset_ += cur_time_page_header_.compressed_size_;
time_in_stream_.wrapped_buf_advance_read_pos(
cur_time_page_header_.compressed_size_);
for (size_t c = 0; c < num_cols; c++) {
auto* col = value_columns_[c];
col->chunk_visit_offset += col->cur_page_header.compressed_size_;
col->in_stream.wrapped_buf_advance_read_pos(
col->cur_page_header.compressed_size_);
}
}
const size_t np = chunk_pages_.size();
chunk_times_.resize(np);
chunk_cols_.resize(num_cols);
for (uint32_t c = 0; c < num_cols; c++) chunk_cols_[c].resize(np);
chunk_page_cursor_ = 0;
return ret;
}
int AlignedChunkReader::decode_chunk_pages() {
int ret = E_OK;
const size_t np = chunk_pages_.size();
const uint32_t num_cols = value_columns_.size();
if (np == 0) return ret;
auto file_read_page = [&](int64_t offset, uint32_t size, char* stack,
uint32_t stack_sz, char*& out,
bool& heap) -> int {
heap = size > stack_sz;
out =
heap ? (char*)common::mem_alloc(size, common::MOD_DEFAULT) : stack;
if (!out) return common::E_OOM;
int rlen = 0;
return read_file_->read(offset, out, size, rlen);
};
// ── Time column (serial) ──
for (size_t p = 0; p < np; p++) {
auto& info = chunk_pages_[p];
auto& td = chunk_times_[p];
td.count = 0;
td.cursor = 0;
if (info.time_compressed_size == 0) continue;
char stk[4096];
char* cbuf;
bool heap;
if (RET_FAIL(file_read_page(info.time_file_offset,
info.time_compressed_size, stk, sizeof(stk),
cbuf, heap)))
return ret;
char* ub = nullptr;
uint32_t us = 0;
time_compressor_->reset(false);
int r = time_compressor_->uncompress(cbuf, info.time_compressed_size,
ub, us);
if (heap && cbuf != ub) common::mem_free(cbuf);
if (r != E_OK || us != info.time_uncompressed_size) {
if (ub) time_compressor_->after_uncompress(ub);
return E_TSFILE_CORRUPTED;
}
common::ByteStream ts_in;
ts_in.wrap_from(ub, us);
time_decoder_->reset();
td.times.clear();
while (time_decoder_->has_remaining(ts_in)) {
int64_t t;
if (time_decoder_->read_int64(t, ts_in) != E_OK) break;
td.times.push_back(t);
}
td.count = (int)td.times.size();
time_compressor_->after_uncompress(ub);
}
// ── Value column decode lambda ──
auto decode_val_col = [&](uint32_t c) -> int {
auto* col = value_columns_[c];
for (size_t p = 0; p < np; p++) {
auto& info = chunk_pages_[p];
auto& cp = chunk_cols_[c][p];
cp.data_num = 0;
cp.nonnull_count = 0;
cp.read_pos = 0;
cp.uncompressed_buf = nullptr;
uint32_t csz = info.value_compressed_sizes[c];
if (csz == 0) continue;
char stk[4096];
char* cbuf;
bool heap;
int r = E_OK;
{
heap = csz > sizeof(stk);
cbuf = heap ? (char*)common::mem_alloc(csz, common::MOD_DEFAULT)
: stk;
if (!cbuf) return common::E_OOM;
int rlen = 0;
r = read_file_->read(info.value_file_offsets[c], cbuf, csz,
rlen);
}
if (r != E_OK) {
if (heap) common::mem_free(cbuf);
return r;
}
char* ub = nullptr;
uint32_t us = 0;
col->compressor->reset(false);
r = col->compressor->uncompress(cbuf, csz, ub, us);
if (heap && cbuf != ub) common::mem_free(cbuf);
if (r != E_OK || us != info.value_uncompressed_sizes[c]) {
if (ub) col->compressor->after_uncompress(ub);
return E_TSFILE_CORRUPTED;
}
cp.uncompressed_buf = ub;
uint32_t off = 0;
uint32_t data_num = SerializationUtil::read_ui32(ub);
off += sizeof(uint32_t);
cp.data_num = data_num;
cp.bitmap.resize((data_num + 7) / 8);
for (size_t i = 0; i < cp.bitmap.size(); i++)
cp.bitmap[i] = *(ub + off++);
char* vbuf = ub + off;
uint32_t vsize = us - off;
col->decoder->reset();
common::ByteStream vi;
vi.wrap_from(vbuf, vsize);
auto dt = col->chunk_header.data_type_;
if (dt == common::STRING || dt == common::TEXT ||
dt == common::BLOB) {
cp.nonnull_count = 0;
continue;
}
const uint32_t nmb = 1 << 7;
int nn = 0;
for (uint32_t i = 0; i < data_num; i++)
if (!cp.bitmap.empty() &&
((cp.bitmap[i / 8] & 0xFF) & (nmb >> (i % 8))) != 0)
nn++;
if (nn == 0) {
cp.nonnull_count = 0;
continue;
}
uint32_t es = common::get_data_type_size(dt);
cp.values.resize((size_t)nn * es);
cp.nonnull_count = 0;
switch (dt) {
case common::BOOLEAN: {
bool* out = reinterpret_cast<bool*>(cp.values.data());
for (int s = 0; s < nn; s++) {
bool v;
if (col->decoder->read_boolean(v, vi) != E_OK) break;
out[cp.nonnull_count++] = v;
}
break;
}
case common::INT32:
case common::DATE: {
int32_t* out = reinterpret_cast<int32_t*>(cp.values.data());
for (int s = 0; s < nn; s++) {
int32_t v;
if (col->decoder->read_int32(v, vi) != E_OK) break;
out[cp.nonnull_count++] = v;
}
break;
}
case common::INT64:
case common::TIMESTAMP: {
int64_t* out = reinterpret_cast<int64_t*>(cp.values.data());
for (int s = 0; s < nn; s++) {
int64_t v;
if (col->decoder->read_int64(v, vi) != E_OK) break;
out[cp.nonnull_count++] = v;
}
break;
}
case common::FLOAT: {
float* out = reinterpret_cast<float*>(cp.values.data());
for (int s = 0; s < nn; s++) {
float v;
if (col->decoder->read_float(v, vi) != E_OK) break;
out[cp.nonnull_count++] = v;
}
break;
}
case common::DOUBLE: {
double* out = reinterpret_cast<double*>(cp.values.data());
for (int s = 0; s < nn; s++) {
double v;
if (col->decoder->read_double(v, vi) != E_OK) break;
out[cp.nonnull_count++] = v;
}
break;
}
default:
break;
}
}
return E_OK;
};
#ifdef ENABLE_THREADS
if (decode_pool_ != nullptr) {
std::vector<int> col_rets(num_cols, E_OK);
for (uint32_t c = 0; c < num_cols; c++)
decode_pool_->submit([&, c]() { col_rets[c] = decode_val_col(c); });
decode_pool_->wait_all();
for (uint32_t c = 0; c < num_cols; c++)
if (col_rets[c] != E_OK) return col_rets[c];
return ret;
}
#endif
for (uint32_t c = 0; c < num_cols && IS_SUCC(ret); c++)
ret = decode_val_col(c);
return ret;
}
int AlignedChunkReader::scatter_chunk_pages(TsBlock* ret_tsblock,
RowAppender& row_appender,
Filter* filter, PageArena* pa,
int* row_offset, int* row_limit) {
int ret = E_OK;
const uint32_t null_mask_base = 1 << 7;
const uint32_t num_cols = value_columns_.size();
const size_t np = chunk_pages_.size();
while ((size_t)chunk_page_cursor_ < np) {
if (row_limit != nullptr && *row_limit == 0) break;
auto& td = chunk_times_[chunk_page_cursor_];
if (td.cursor >= td.count) {
chunk_page_cursor_++;
continue;
}
// Page-level offset skip: skip entire pre-decoded page.
if (row_offset != nullptr && *row_offset > 0 &&
*row_offset >= (td.count - td.cursor)) {
*row_offset -= (td.count - td.cursor);
// Advance read_pos for all columns
for (uint32_t c = 0; c < num_cols; c++) {
auto& cp = chunk_cols_[c][chunk_page_cursor_];
cp.read_pos = cp.nonnull_count; // fully consumed
}
td.cursor = td.count;
chunk_page_cursor_++;
continue;
}
auto& info = chunk_pages_[chunk_page_cursor_];
bool need_filter = (info.pass_type == PagePassType::BOUNDARY);
bool can_bulk = !need_filter;
if (can_bulk) {
for (uint32_t c = 0; c < num_cols && can_bulk; c++) {
auto& cp = chunk_cols_[c][chunk_page_cursor_];
auto dt = value_columns_[c]->chunk_header.data_type_;
if (dt == common::STRING || dt == common::TEXT ||
dt == common::BLOB)
can_bulk = false;
else if (cp.data_num == 0)
can_bulk = false;
else if (cp.nonnull_count != (int)cp.data_num)
can_bulk = false;
}
}
if (can_bulk) {
while (td.cursor < td.count) {
if (row_limit != nullptr && *row_limit == 0) break;
// Row-level offset skip
if (row_offset != nullptr && *row_offset > 0) {
(*row_offset)--;
for (uint32_t c = 0; c < num_cols; c++)
chunk_cols_[c][chunk_page_cursor_].read_pos++;
td.cursor++;
continue;
}
if (UNLIKELY(!row_appender.add_row())) return E_OVERFLOW;
int64_t t = td.times[td.cursor];
row_appender.append(0, (char*)&t, sizeof(int64_t));
for (uint32_t c = 0; c < num_cols; c++) {
auto& cp = chunk_cols_[c][chunk_page_cursor_];
uint32_t es = common::get_data_type_size(
value_columns_[c]->chunk_header.data_type_);
row_appender.append(
c + 1,
cp.values.data() +
static_cast<size_t>(cp.read_pos) * es,
es);
cp.read_pos++;
}
td.cursor++;
if (row_limit != nullptr) (*row_limit)--;
}
} else {
while (td.cursor < td.count) {
if (row_limit != nullptr && *row_limit == 0) break;
int64_t t = td.times[td.cursor];
// Filter skip
if (need_filter && filter != nullptr &&
!filter->satisfy_start_end_time(t, t)) {
for (uint32_t c = 0; c < num_cols; c++) {
auto& cp = chunk_cols_[c][chunk_page_cursor_];
if (cp.data_num > 0 && !cp.bitmap.empty()) {
int vi = td.cursor;
if ((cp.bitmap[vi / 8] & 0xFF) &
(null_mask_base >> (vi % 8)))
cp.read_pos++;
}
}
td.cursor++;
continue;
}
// Offset skip
if (row_offset != nullptr && *row_offset > 0) {
(*row_offset)--;
for (uint32_t c = 0; c < num_cols; c++) {
auto& cp = chunk_cols_[c][chunk_page_cursor_];
if (cp.data_num > 0 && !cp.bitmap.empty()) {
int vi = td.cursor;
if ((cp.bitmap[vi / 8] & 0xFF) &
(null_mask_base >> (vi % 8)))
cp.read_pos++;
}
}
td.cursor++;
continue;
}
if (UNLIKELY(!row_appender.add_row())) return E_OVERFLOW;
row_appender.append(0, (char*)&t, sizeof(int64_t));
for (uint32_t c = 0; c < num_cols; c++) {
auto& cp = chunk_cols_[c][chunk_page_cursor_];
int vi = td.cursor;
bool is_null = true;
if (cp.data_num > 0 && !cp.bitmap.empty()) {
is_null = ((cp.bitmap[vi / 8] & 0xFF) &
(null_mask_base >> (vi % 8))) == 0;
}
if (is_null) {
row_appender.append_null(c + 1);
} else {
uint32_t es = common::get_data_type_size(
value_columns_[c]->chunk_header.data_type_);
row_appender.append(
c + 1,
cp.values.data() +
static_cast<size_t>(cp.read_pos) * es,
es);
cp.read_pos++;
}
}
td.cursor++;
if (row_limit != nullptr) (*row_limit)--;
}
}
chunk_page_cursor_++;
}
return ret;
}
} // end namespace storage