| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "aligned_chunk_reader.h" |
| |
| #include <algorithm> |
| |
| #include "compress/compressor_factory.h" |
| #include "encoding/decoder_factory.h" |
| |
| using namespace common; |
| namespace storage { |
| |
| int AlignedChunkReader::init(ReadFile* read_file, String m_name, |
| TSDataType data_type, Filter* time_filter) { |
| read_file_ = read_file; |
| measurement_name_.shallow_copy_from(m_name); |
| time_decoder_ = DecoderFactory::alloc_time_decoder(); |
| value_decoder_ = nullptr; |
| time_compressor_ = nullptr; |
| value_compressor_ = nullptr; |
| time_filter_ = time_filter; |
| time_uncompressed_buf_ = nullptr; |
| value_uncompressed_buf_ = nullptr; |
| if (IS_NULL(time_decoder_)) { |
| return E_OOM; |
| } |
| return E_OK; |
| } |
| |
| void AlignedChunkReader::reset() { |
| time_chunk_meta_ = nullptr; |
| value_chunk_meta_ = nullptr; |
| time_chunk_header_.reset(); |
| value_chunk_header_.reset(); |
| cur_time_page_header_.reset(); |
| cur_value_page_header_.reset(); |
| |
| char* file_data_buf = time_in_stream_.get_wrapped_buf(); |
| if (file_data_buf != nullptr) { |
| mem_free(file_data_buf); |
| } |
| time_in_stream_.reset(); |
| file_data_buf = value_in_stream_.get_wrapped_buf(); |
| if (file_data_buf != nullptr) { |
| mem_free(file_data_buf); |
| } |
| value_in_stream_.reset(); |
| file_data_time_buf_size_ = 0; |
| file_data_value_buf_size_ = 0; |
| time_chunk_visit_offset_ = 0; |
| value_chunk_visit_offset_ = 0; |
| |
| // Free leftover uncompressed buffers from the previous chunk. |
| if (time_uncompressed_buf_ != nullptr && time_compressor_ != nullptr) { |
| time_compressor_->after_uncompress(time_uncompressed_buf_); |
| time_uncompressed_buf_ = nullptr; |
| } |
| |
| // Multi-value reset |
| for (auto* col : value_columns_) { |
| // Free uncompressed buffer before resetting. |
| if (col->uncompressed_buf != nullptr && col->compressor != nullptr) { |
| col->compressor->after_uncompress(col->uncompressed_buf); |
| col->uncompressed_buf = nullptr; |
| } |
| char* buf = col->in_stream.get_wrapped_buf(); |
| if (buf != nullptr) mem_free(buf); |
| col->in_stream.reset(); |
| col->in.reset(); |
| col->chunk_header.reset(); |
| col->cur_page_header.reset(); |
| col->file_data_buf_size = 0; |
| col->chunk_visit_offset = 0; |
| col->notnull_bitmap.clear(); |
| col->cur_value_index = -1; |
| col->chunk_meta = nullptr; |
| // Note: decoder/compressor are NOT freed here — they are reused by |
| // alloc_compressor_and_decoder() in load_by_aligned_meta_multi(). |
| } |
| } |
| |
| void AlignedChunkReader::destroy() { |
| if (time_decoder_ != nullptr) { |
| time_decoder_->~Decoder(); |
| DecoderFactory::free(time_decoder_); |
| time_decoder_ = nullptr; |
| } |
| if (value_decoder_ != nullptr) { |
| value_decoder_->~Decoder(); |
| DecoderFactory::free(value_decoder_); |
| value_decoder_ = nullptr; |
| } |
| if (time_compressor_ != nullptr) { |
| time_compressor_->~Compressor(); |
| CompressorFactory::free(time_compressor_); |
| time_compressor_ = nullptr; |
| } |
| if (value_compressor_ != nullptr) { |
| value_compressor_->~Compressor(); |
| CompressorFactory::free(value_compressor_); |
| value_compressor_ = nullptr; |
| } |
| char* buf = time_in_stream_.get_wrapped_buf(); |
| if (buf != nullptr) { |
| mem_free(buf); |
| time_in_stream_.clear_wrapped_buf(); |
| } |
| cur_time_page_header_.reset(); |
| buf = value_in_stream_.get_wrapped_buf(); |
| if (buf != nullptr) { |
| mem_free(buf); |
| value_in_stream_.clear_wrapped_buf(); |
| } |
| cur_value_page_header_.reset(); |
| chunk_header_.~ChunkHeader(); |
| |
| // Multi-value destroy |
| for (auto* col : value_columns_) { |
| if (col->decoder != nullptr) { |
| col->decoder->~Decoder(); |
| DecoderFactory::free(col->decoder); |
| col->decoder = nullptr; |
| } |
| if (col->compressor != nullptr) { |
| col->compressor->~Compressor(); |
| CompressorFactory::free(col->compressor); |
| col->compressor = nullptr; |
| } |
| buf = col->in_stream.get_wrapped_buf(); |
| if (buf != nullptr) { |
| mem_free(buf); |
| col->in_stream.clear_wrapped_buf(); |
| } |
| col->cur_page_header.reset(); |
| delete col; |
| } |
| value_columns_.clear(); |
| if (decode_pool_ != nullptr) { |
| delete decode_pool_; |
| decode_pool_ = nullptr; |
| } |
| } |
| |
| int AlignedChunkReader::load_by_aligned_meta(ChunkMeta* time_chunk_meta, |
| ChunkMeta* value_chunk_meta) { |
| int ret = E_OK; |
| time_chunk_meta_ = time_chunk_meta; |
| value_chunk_meta_ = value_chunk_meta; |
| #if DEBUG_SE |
| std::cout << "AlignedChunkReader::load_by_meta, meta=" << *time_chunk_meta |
| << ", " << *value_chunk_meta << std::endl; |
| #endif |
| /* ================ deserialize time_chunk_header ================*/ |
| // TODO configurable |
| file_data_time_buf_size_ = 1024; |
| file_data_value_buf_size_ = 1024; |
| int32_t ret_read_len = 0; |
| char* time_file_data_buf = |
| (char*)mem_alloc(file_data_time_buf_size_, MOD_CHUNK_READER); |
| if (IS_NULL(time_file_data_buf)) { |
| return E_OOM; |
| } |
| ret = read_file_->read(time_chunk_meta_->offset_of_chunk_header_, |
| time_file_data_buf, file_data_time_buf_size_, |
| ret_read_len); |
| if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { |
| ret = E_TSFILE_CORRUPTED; |
| LOGE("file corrupted, ret=" << ret << ", offset=" |
| << time_chunk_meta_->offset_of_chunk_header_ |
| << "read_len=" << ret_read_len); |
| mem_free(time_file_data_buf); |
| } |
| if (IS_SUCC(ret)) { |
| time_in_stream_.wrap_from(time_file_data_buf, ret_read_len); |
| if (RET_FAIL(time_chunk_header_.deserialize_from(time_in_stream_))) { |
| } else { |
| time_chunk_visit_offset_ = time_in_stream_.read_pos(); |
| } |
| } |
| /* ================ deserialize value_chunk_header ================*/ |
| ret_read_len = 0; |
| char* value_file_data_buf = |
| (char*)mem_alloc(file_data_value_buf_size_, MOD_CHUNK_READER); |
| if (IS_NULL(value_file_data_buf)) { |
| return E_OOM; |
| } |
| ret = read_file_->read(value_chunk_meta_->offset_of_chunk_header_, |
| value_file_data_buf, file_data_value_buf_size_, |
| ret_read_len); |
| if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { |
| ret = E_TSFILE_CORRUPTED; |
| LOGE("file corrupted, ret=" |
| << ret << ", offset=" << value_chunk_meta_->offset_of_chunk_header_ |
| << "read_len=" << ret_read_len); |
| mem_free(value_file_data_buf); |
| } |
| if (IS_SUCC(ret)) { |
| value_in_stream_.wrap_from(value_file_data_buf, ret_read_len); |
| if (RET_FAIL(value_chunk_header_.deserialize_from(value_in_stream_))) { |
| } else if (RET_FAIL(alloc_compressor_and_decoder( |
| time_decoder_, time_compressor_, |
| time_chunk_header_.encoding_type_, |
| time_chunk_header_.data_type_, |
| time_chunk_header_.compression_type_))) { |
| } else if (RET_FAIL(alloc_compressor_and_decoder( |
| value_decoder_, value_compressor_, |
| value_chunk_header_.encoding_type_, |
| value_chunk_header_.data_type_, |
| value_chunk_header_.compression_type_))) { |
| } else { |
| value_chunk_visit_offset_ = value_in_stream_.read_pos(); |
| #if DEBUG_SE |
| std::cout << "AlignedChunkReader::load_by_meta, time_chunk_header=" |
| << time_chunk_header_ |
| << ", value_chunk_header=" << value_chunk_header_ |
| << std::endl; |
| #endif |
| } |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::alloc_compressor_and_decoder( |
| storage::Decoder*& decoder, storage::Compressor*& compressor, |
| TSEncoding encoding, TSDataType data_type, CompressionType compression) { |
| if (decoder != nullptr) { |
| decoder->reset(); |
| } else { |
| decoder = DecoderFactory::alloc_value_decoder(encoding, data_type); |
| if (IS_NULL(decoder)) { |
| return E_OOM; |
| } |
| } |
| |
| if (compressor != nullptr) { |
| compressor->reset(false); |
| } else { |
| compressor = CompressorFactory::alloc_compressor(compression); |
| if (compressor == nullptr) { |
| return E_OOM; |
| } |
| } |
| return E_OK; |
| } |
| |
| int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock, |
| Filter* oneshoot_filter, PageArena& pa) { |
| if (multi_value_mode_) { |
| return get_next_page_multi(ret_tsblock, oneshoot_filter, pa); |
| } |
| int ret = E_OK; |
| Filter* filter = |
| (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_); |
| bool pt = prev_time_page_not_finish(); |
| bool pv = prev_value_page_not_finish(); |
| if (pt && pv) { |
| ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter, |
| &pa); |
| return ret; |
| } |
| if (!pt && !pv) { |
| while (IS_SUCC(ret)) { |
| if (RET_FAIL(get_cur_page_header( |
| time_chunk_meta_, time_in_stream_, cur_time_page_header_, |
| time_chunk_visit_offset_, time_chunk_header_))) { |
| } else if (RET_FAIL(get_cur_page_header( |
| value_chunk_meta_, value_in_stream_, |
| cur_value_page_header_, value_chunk_visit_offset_, |
| value_chunk_header_))) { |
| } else if (cur_page_statisify_filter(filter)) { |
| break; |
| } else if (RET_FAIL(skip_cur_page())) { |
| } |
| if (!has_more_data()) { |
| ret = E_NO_MORE_DATA; |
| break; |
| } |
| } |
| if (IS_SUCC(ret)) { |
| ret = decode_cur_time_page_data() || decode_cur_value_page_data(); |
| } |
| } |
| if (IS_SUCC(ret)) { |
| ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter, |
| &pa); |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::get_cur_page_header(ChunkMeta*& chunk_meta, |
| common::ByteStream& in_stream, |
| PageHeader& cur_page_header, |
| uint32_t& chunk_visit_offset, |
| ChunkHeader& chunk_header, |
| int32_t* override_buf_size) { |
| int ret = E_OK; |
| bool retry = true; |
| int cur_page_header_serialized_size = 0; |
| // TODO: configurable |
| int retry_read_want_size = 1024; |
| if (chunk_visit_offset - chunk_header.serialized_size_ >= |
| chunk_header.data_size_) { |
| cur_page_header.reset(); |
| return E_OK; |
| } |
| |
| do { |
| in_stream.mark_read_pos(); |
| cur_page_header.reset(); |
| ret = cur_page_header.deserialize_from( |
| in_stream, !chunk_has_only_one_page(chunk_header), |
| chunk_header.data_type_); |
| cur_page_header_serialized_size = in_stream.get_mark_len(); |
| if (deserialize_buf_not_enough(ret) && retry) { |
| retry = false; |
| retry_read_want_size += 1024; |
| int32_t& file_data_buf_size = |
| override_buf_size != nullptr ? *override_buf_size |
| : chunk_header.data_type_ == common::VECTOR |
| ? file_data_time_buf_size_ |
| : file_data_value_buf_size_; |
| // do not shrink buffer for page header, otherwise, the buffer is |
| // most likely to grow back when reading page data |
| if (E_OK == read_from_file_and_rewrap( |
| in_stream, chunk_meta, chunk_visit_offset, |
| file_data_buf_size, retry_read_want_size, false)) { |
| continue; |
| } |
| } |
| break; |
| } while (true); |
| if (IS_SUCC(ret)) { |
| // visit a header |
| chunk_visit_offset += cur_page_header_serialized_size; |
| } |
| #if DEBUG_SE |
| std::cout << "get_cur_page_header, ret=" << ret << ", retry=" << retry |
| << ", cur_page_header=" << cur_page_header |
| << ", chunk_meta->offset_of_chunk_header_=" |
| << chunk_meta->offset_of_chunk_header_ |
| << ", cur_page_header_serialized_size=" |
| << cur_page_header_serialized_size << std::endl; |
| #endif |
| return ret; |
| } |
| |
| // reader at least @want_size bytes from file and wrap the buffer into |
| // @in_stream_ |
| int AlignedChunkReader::read_from_file_and_rewrap( |
| common::ByteStream& in_stream_, ChunkMeta*& chunk_meta, |
| uint32_t& chunk_visit_offset, int32_t& file_data_buf_size, int want_size, |
| bool may_shrink) { |
| int ret = E_OK; |
| const int DEFAULT_READ_SIZE = 4096; // may use page_size + page_header_size |
| char* file_data_buf = in_stream_.get_wrapped_buf(); |
| int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset; |
| int read_size = |
| (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size); |
| if (file_data_buf_size < read_size || |
| (may_shrink && read_size < file_data_buf_size / 10)) { |
| file_data_buf = (char*)mem_realloc(file_data_buf, read_size); |
| if (IS_NULL(file_data_buf)) { |
| return E_OOM; |
| } |
| file_data_buf_size = read_size; |
| } |
| int ret_read_len = 0; |
| if (RET_FAIL( |
| read_file_->read(offset, file_data_buf, read_size, ret_read_len))) { |
| } else { |
| in_stream_.wrap_from(file_data_buf, ret_read_len); |
| #ifdef DEBUG_SE |
| std::cout << "file offset = " << offset << " len = " << ret_read_len |
| << std::endl; |
| DEBUG_hex_dump_buf("wrapped buf = ", file_data_buf, 256); |
| #endif |
| } |
| return ret; |
| } |
| |
| bool AlignedChunkReader::cur_page_statisify_filter(Filter* filter) { |
| bool value_satisfy = filter == nullptr || |
| cur_value_page_header_.statistic_ == nullptr || |
| filter->satisfy(cur_value_page_header_.statistic_); |
| bool time_satisfy = filter == nullptr || |
| cur_time_page_header_.statistic_ == nullptr || |
| filter->satisfy(cur_time_page_header_.statistic_); |
| return time_satisfy && value_satisfy; |
| } |
| |
| int AlignedChunkReader::skip_cur_page() { |
| int ret = E_OK; |
| // visit a page tv data |
| time_chunk_visit_offset_ += cur_time_page_header_.compressed_size_; |
| time_in_stream_.wrapped_buf_advance_read_pos( |
| cur_time_page_header_.compressed_size_); |
| value_chunk_visit_offset_ += cur_value_page_header_.compressed_size_; |
| value_in_stream_.wrapped_buf_advance_read_pos( |
| cur_value_page_header_.compressed_size_); |
| return ret; |
| } |
| |
| int AlignedChunkReader::decode_cur_time_page_data() { |
| int ret = E_OK; |
| |
| // Step 1: make sure we load the whole page data in @in_stream_ |
| if (time_in_stream_.remaining_size() < |
| cur_time_page_header_.compressed_size_) { |
| // std::cout << "decode_cur_page_data. in_stream_.remaining_size="<< |
| // in_stream_.remaining_size() << ", cur_page_header_.compressed_size_=" |
| // << cur_page_header_.compressed_size_ << std::endl; |
| if (RET_FAIL(read_from_file_and_rewrap( |
| time_in_stream_, time_chunk_meta_, time_chunk_visit_offset_, |
| file_data_time_buf_size_, |
| cur_time_page_header_.compressed_size_))) { |
| } |
| } |
| |
| char* time_compressed_buf = nullptr; |
| char* time_uncompressed_buf = nullptr; |
| uint32_t time_compressed_buf_size = 0; |
| uint32_t time_uncompressed_buf_size = 0; |
| |
| // Step 2: do uncompress |
| if (IS_SUCC(ret)) { |
| time_compressed_buf = |
| time_in_stream_.get_wrapped_buf() + time_in_stream_.read_pos(); |
| #ifdef DEBUG_SE |
| std::cout << "AlignedChunkReader::decode_cur_page_data,time_in_stream_." |
| "get_wrapped_buf=" |
| << (void*)(time_in_stream_.get_wrapped_buf()) |
| << ", time_in_stream_.read_pos=" << time_in_stream_.read_pos() |
| << std::endl; |
| #endif |
| time_compressed_buf_size = cur_time_page_header_.compressed_size_; |
| time_in_stream_.wrapped_buf_advance_read_pos(time_compressed_buf_size); |
| time_chunk_visit_offset_ += time_compressed_buf_size; |
| if (RET_FAIL(time_compressor_->reset(false))) { |
| } else if (RET_FAIL(time_compressor_->uncompress( |
| time_compressed_buf, time_compressed_buf_size, |
| time_uncompressed_buf, time_uncompressed_buf_size))) { |
| } else { |
| time_uncompressed_buf_ = time_uncompressed_buf; |
| } |
| #ifdef DEBUG_SE |
| DEBUG_hex_dump_buf( |
| "AlignedChunkReader reader, time_uncompressed buf = ", |
| time_uncompressed_buf, time_uncompressed_buf_size); |
| #endif |
| if (ret != E_OK || time_uncompressed_buf_size != |
| cur_time_page_header_.uncompressed_size_) { |
| ret = E_TSFILE_CORRUPTED; |
| ASSERT(false); |
| } |
| } |
| |
| time_decoder_->reset(); |
| #ifdef DEBUG_SE |
| DEBUG_hex_dump_buf("AlignedChunkReader reader, time_buf = ", time_buf, |
| time_buf_size); |
| #endif |
| time_in_.wrap_from(time_uncompressed_buf_, time_uncompressed_buf_size); |
| return ret; |
| } |
| |
| int AlignedChunkReader::decode_cur_value_page_data() { |
| int ret = E_OK; |
| |
| // Step 1: make sure we load the whole page data in @in_stream_ |
| if (value_in_stream_.remaining_size() < |
| cur_value_page_header_.compressed_size_) { |
| // std::cout << "decode_cur_page_data. in_stream_.remaining_size="<< |
| // in_stream_.remaining_size() << ", cur_page_header_.compressed_size_=" |
| // << cur_page_header_.compressed_size_ << std::endl; |
| if (RET_FAIL(read_from_file_and_rewrap( |
| value_in_stream_, value_chunk_meta_, value_chunk_visit_offset_, |
| file_data_value_buf_size_, |
| cur_value_page_header_.compressed_size_))) { |
| } |
| } |
| |
| char* value_compressed_buf = nullptr; |
| char* value_uncompressed_buf = nullptr; |
| uint32_t value_compressed_buf_size = 0; |
| uint32_t value_uncompressed_buf_size = 0; |
| char* value_buf = nullptr; |
| uint32_t value_buf_size = 0; |
| |
| if (cur_value_page_header_.compressed_size_ == 0) { |
| value_in_.wrap_from(value_buf, 0); |
| return E_OK; |
| } |
| |
| // Step 2: do uncompress |
| if (IS_SUCC(ret)) { |
| value_compressed_buf = |
| value_in_stream_.get_wrapped_buf() + value_in_stream_.read_pos(); |
| value_compressed_buf_size = cur_value_page_header_.compressed_size_; |
| value_in_stream_.wrapped_buf_advance_read_pos( |
| value_compressed_buf_size); |
| value_chunk_visit_offset_ += value_compressed_buf_size; |
| if (RET_FAIL(value_compressor_->reset(false))) { |
| } else if (RET_FAIL(value_compressor_->uncompress( |
| value_compressed_buf, value_compressed_buf_size, |
| value_uncompressed_buf, value_uncompressed_buf_size))) { |
| } else { |
| value_uncompressed_buf_ = value_uncompressed_buf; |
| } |
| #ifdef DEBUG_SE |
| DEBUG_hex_dump_buf( |
| "AlignedChunkReader reader, value_uncompressed buf = ", |
| value_uncompressed_buf, value_uncompressed_buf_size); |
| #endif |
| if (ret != E_OK || value_uncompressed_buf_size != |
| cur_value_page_header_.uncompressed_size_) { |
| ret = E_TSFILE_CORRUPTED; |
| ASSERT(false); |
| } |
| } |
| // Step 3: get value_buf |
| if (IS_SUCC(ret)) { |
| uint32_t value_uncompressed_buf_offset = 0; |
| value_page_data_num_ = |
| SerializationUtil::read_ui32(value_uncompressed_buf); |
| value_uncompressed_buf_offset += sizeof(uint32_t); |
| value_page_col_notnull_bitmap_.resize((value_page_data_num_ + 7) / 8); |
| for (unsigned char& i : value_page_col_notnull_bitmap_) { |
| i = *(value_uncompressed_buf + value_uncompressed_buf_offset); |
| value_uncompressed_buf_offset++; |
| } |
| cur_value_index = -1; |
| value_buf = value_uncompressed_buf + value_uncompressed_buf_offset; |
| value_buf_size = |
| value_uncompressed_buf_size - value_uncompressed_buf_offset; |
| } |
| value_decoder_->reset(); |
| #ifdef DEBUG_SE |
| DEBUG_hex_dump_buf("AlignedChunkReader reader, value_buf = ", value_buf, |
| value_buf_size); |
| #endif |
| value_in_.wrap_from(value_buf, value_buf_size); |
| return ret; |
| } |
| |
| int AlignedChunkReader::decode_time_value_buf_into_tsblock( |
| TsBlock*& ret_tsblock, Filter* filter, common::PageArena* pa) { |
| int ret = common::E_OK; |
| ret = decode_tv_buf_into_tsblock_by_datatype(time_in_, value_in_, |
| ret_tsblock, filter, pa); |
| // if we return during @decode_tv_buf_into_tsblock, we should keep |
| // @uncompressed_buf_ valid until all TV pairs are decoded. |
| if (ret != E_OVERFLOW) { |
| if (time_uncompressed_buf_ != nullptr) { |
| time_compressor_->after_uncompress(time_uncompressed_buf_); |
| time_uncompressed_buf_ = nullptr; |
| } |
| if (value_uncompressed_buf_ != nullptr) { |
| value_compressor_->after_uncompress(value_uncompressed_buf_); |
| value_uncompressed_buf_ = nullptr; |
| } |
| if (!prev_value_page_not_finish()) { |
| value_in_.reset(); |
| } |
| if (!prev_time_page_not_finish()) { |
| time_in_.reset(); |
| } |
| value_page_col_notnull_bitmap_.clear(); |
| value_page_col_notnull_bitmap_.shrink_to_fit(); |
| } else { |
| ret = E_OK; |
| } |
| return ret; |
| } |
| |
| #define DECODE_TYPED_TV_INTO_TSBLOCK(CppType, ReadType, time_in, value_in, \ |
| row_appender) \ |
| do { \ |
| uint32_t mask = 1 << 7; \ |
| int64_t time = 0; \ |
| CppType value; \ |
| while (time_decoder_->has_remaining(time_in)) { \ |
| cur_value_index++; \ |
| if (value_page_col_notnull_bitmap_.empty() || \ |
| ((value_page_col_notnull_bitmap_[cur_value_index / 8] & \ |
| 0xFF) & \ |
| (mask >> (cur_value_index % 8))) == 0) { \ |
| ret = time_decoder_->read_int64(time, time_in); \ |
| if (ret != E_OK) { \ |
| break; \ |
| } \ |
| if (UNLIKELY(!row_appender.add_row())) { \ |
| ret = E_OVERFLOW; \ |
| break; \ |
| } \ |
| row_appender.append(0, (char*)&time, sizeof(time)); \ |
| row_appender.append_null(1); \ |
| continue; \ |
| } \ |
| assert(value_decoder_->has_remaining(value_in)); \ |
| if (!value_decoder_->has_remaining(value_in)) { \ |
| return common::E_DATA_INCONSISTENCY; \ |
| } \ |
| if (UNLIKELY(!row_appender.add_row())) { \ |
| ret = E_OVERFLOW; \ |
| cur_value_index--; \ |
| break; \ |
| } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { \ |
| } else if (RET_FAIL(value_decoder_->read_##ReadType(value, \ |
| value_in))) { \ |
| } else if (filter != nullptr && !filter->satisfy(time, value)) { \ |
| row_appender.backoff_add_row(); \ |
| continue; \ |
| } else { \ |
| /*std::cout << "decoder: time=" << time << ", value=" << value \ |
| * << std::endl;*/ \ |
| row_appender.append(0, (char*)&time, sizeof(time)); \ |
| row_appender.append(1, (char*)&value, sizeof(value)); \ |
| } \ |
| } \ |
| } while (false) |
| |
| int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK( |
| ByteStream& time_in, ByteStream& value_in, RowAppender& row_appender, |
| Filter* filter) { |
| int ret = E_OK; |
| uint32_t mask = 1 << 7; |
| int64_t time = 0; |
| int32_t value; |
| while (time_decoder_->has_remaining(time_in)) { |
| cur_value_index++; |
| if (value_page_col_notnull_bitmap_.empty() || |
| ((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & |
| (mask >> (cur_value_index % 8))) == 0) { |
| ret = time_decoder_->read_int64(time, time_in); |
| if (ret != E_OK) { |
| break; |
| } |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)&time, sizeof(time)); |
| row_appender.append_null(1); |
| continue; |
| } |
| assert(value_decoder_->has_remaining(value_in)); |
| if (!value_decoder_->has_remaining(value_in)) { |
| return common::E_DATA_INCONSISTENCY; |
| } |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| cur_value_index--; |
| break; |
| } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { |
| } else if (RET_FAIL(value_decoder_->read_int32(value, value_in))) { |
| } else if (filter != nullptr && !filter->satisfy(time, value)) { |
| row_appender.backoff_add_row(); |
| continue; |
| } else { |
| /*std::cout << "decoder: time=" << time << ", value=" << value |
| * << std::endl;*/ |
| row_appender.append(0, (char*)&time, sizeof(time)); |
| row_appender.append(1, (char*)&value, sizeof(value)); |
| } |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::i32_DECODE_TV_BATCH(ByteStream& time_in, |
| ByteStream& value_in, |
| RowAppender& row_appender, |
| Filter* filter) { |
| int ret = E_OK; |
| const int BATCH = 129; |
| int64_t times[BATCH]; |
| int32_t values[BATCH]; |
| const uint32_t null_mask_base = 1 << 7; |
| |
| while (time_decoder_->has_remaining(time_in)) { |
| if (row_appender.remaining() < (uint32_t)BATCH) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| |
| // Block-level time filter check |
| bool block_all_pass = false; |
| if (filter != nullptr) { |
| int64_t block_min, block_max; |
| int block_count; |
| if (time_decoder_->peek_next_block_range_int64( |
| time_in, block_min, block_max, block_count)) { |
| if (!filter->satisfy_start_end_time(block_min, block_max)) { |
| int skipped = 0; |
| time_decoder_->skip_peeked_block_int64(time_in, skipped); |
| int nonnull = 0; |
| for (int i = 0; i < block_count; ++i) { |
| int vi = cur_value_index + 1 + i; |
| if (!value_page_col_notnull_bitmap_.empty() && |
| ((value_page_col_notnull_bitmap_[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) != 0) { |
| ++nonnull; |
| } |
| } |
| cur_value_index += block_count; |
| if (nonnull > 0) { |
| int sk = 0; |
| value_decoder_->skip_int32(nonnull, sk, value_in); |
| } |
| continue; |
| } |
| if (filter->contain_start_end_time(block_min, block_max)) { |
| block_all_pass = true; |
| } |
| } |
| } |
| |
| int time_count = 0; |
| if (RET_FAIL(time_decoder_->read_batch_int64( |
| times, BATCH, time_count, time_in))) { |
| break; |
| } |
| if (time_count == 0) break; |
| |
| bool is_null[BATCH]; |
| int nonnull_count = 0; |
| for (int i = 0; i < time_count; ++i) { |
| int vi = cur_value_index + 1 + i; |
| if (value_page_col_notnull_bitmap_.empty() || |
| ((value_page_col_notnull_bitmap_[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) == 0) { |
| is_null[i] = true; |
| } else { |
| is_null[i] = false; |
| ++nonnull_count; |
| } |
| } |
| |
| bool time_mask[BATCH]; |
| int pass_count = time_count; |
| if (filter != nullptr && !block_all_pass) { |
| pass_count = filter->satisfy_batch_time(times, time_count, |
| time_mask); |
| } |
| |
| if (pass_count == 0) { |
| if (nonnull_count > 0) { |
| int skipped = 0; |
| value_decoder_->skip_int32(nonnull_count, skipped, value_in); |
| } |
| cur_value_index += time_count; |
| continue; |
| } |
| |
| int value_count = 0; |
| if (nonnull_count > 0) { |
| if (RET_FAIL(value_decoder_->read_batch_int32( |
| values, nonnull_count, value_count, value_in))) { |
| break; |
| } |
| } |
| |
| int val_idx = 0; |
| for (int i = 0; i < time_count; ++i) { |
| cur_value_index++; |
| if (filter != nullptr && !block_all_pass && !time_mask[i]) { |
| if (!is_null[i]) ++val_idx; |
| continue; |
| } |
| if (is_null[i]) { |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| row_appender.append_null(1); |
| } else { |
| int32_t val = values[val_idx++]; |
| if (filter != nullptr && !block_all_pass && |
| !filter->satisfy(times[i], (int64_t)val)) { |
| continue; |
| } |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| row_appender.append(1, (char*)&val, sizeof(int32_t)); |
| } |
| } |
| if (ret != E_OK) break; |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::i64_DECODE_TV_BATCH(ByteStream& time_in, |
| ByteStream& value_in, |
| RowAppender& row_appender, |
| Filter* filter) { |
| int ret = E_OK; |
| const int BATCH = 129; |
| int64_t times[BATCH]; |
| int64_t values[BATCH]; |
| const uint32_t null_mask_base = 1 << 7; |
| |
| while (time_decoder_->has_remaining(time_in)) { |
| if (row_appender.remaining() < (uint32_t)BATCH) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| |
| // Block-level time filter check: skip entire block if out of range |
| bool block_all_pass = false; |
| if (filter != nullptr) { |
| int64_t block_min, block_max; |
| int block_count; |
| if (time_decoder_->peek_next_block_range_int64( |
| time_in, block_min, block_max, block_count)) { |
| if (!filter->satisfy_start_end_time(block_min, block_max)) { |
| int skipped = 0; |
| time_decoder_->skip_peeked_block_int64(time_in, skipped); |
| int nonnull = 0; |
| for (int i = 0; i < block_count; ++i) { |
| int vi = cur_value_index + 1 + i; |
| if (!value_page_col_notnull_bitmap_.empty() && |
| ((value_page_col_notnull_bitmap_[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) != 0) { |
| ++nonnull; |
| } |
| } |
| cur_value_index += block_count; |
| if (nonnull > 0) { |
| int sk = 0; |
| value_decoder_->skip_int64(nonnull, sk, value_in); |
| } |
| continue; |
| } |
| if (filter->contain_start_end_time(block_min, block_max)) { |
| block_all_pass = true; |
| } |
| } |
| } |
| |
| int time_count = 0; |
| if (RET_FAIL(time_decoder_->read_batch_int64( |
| times, BATCH, time_count, time_in))) { |
| break; |
| } |
| if (time_count == 0) break; |
| |
| bool is_null[BATCH]; |
| int nonnull_count = 0; |
| for (int i = 0; i < time_count; ++i) { |
| int vi = cur_value_index + 1 + i; |
| if (value_page_col_notnull_bitmap_.empty() || |
| ((value_page_col_notnull_bitmap_[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) == 0) { |
| is_null[i] = true; |
| } else { |
| is_null[i] = false; |
| ++nonnull_count; |
| } |
| } |
| |
| bool time_mask[BATCH]; |
| int pass_count = time_count; |
| if (filter != nullptr && !block_all_pass) { |
| pass_count = filter->satisfy_batch_time(times, time_count, |
| time_mask); |
| } |
| |
| if (pass_count == 0) { |
| if (nonnull_count > 0) { |
| int skipped = 0; |
| value_decoder_->skip_int64(nonnull_count, skipped, value_in); |
| } |
| cur_value_index += time_count; |
| continue; |
| } |
| |
| int value_count = 0; |
| if (nonnull_count > 0) { |
| if (RET_FAIL(value_decoder_->read_batch_int64( |
| values, nonnull_count, value_count, value_in))) { |
| break; |
| } |
| } |
| |
| int val_idx = 0; |
| for (int i = 0; i < time_count; ++i) { |
| cur_value_index++; |
| if (filter != nullptr && !block_all_pass && !time_mask[i]) { |
| if (!is_null[i]) ++val_idx; |
| continue; |
| } |
| if (is_null[i]) { |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| row_appender.append_null(1); |
| } else { |
| int64_t val = values[val_idx++]; |
| if (filter != nullptr && !block_all_pass && |
| !filter->satisfy(times[i], val)) { |
| continue; |
| } |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| row_appender.append(1, (char*)&val, sizeof(int64_t)); |
| } |
| } |
| if (ret != E_OK) break; |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::float_DECODE_TV_BATCH(ByteStream& time_in, |
| ByteStream& value_in, |
| RowAppender& row_appender, |
| Filter* filter) { |
| int ret = E_OK; |
| const int BATCH = 129; |
| int64_t times[BATCH]; |
| float values[BATCH]; |
| const uint32_t null_mask_base = 1 << 7; |
| |
| while (time_decoder_->has_remaining(time_in)) { |
| if (row_appender.remaining() < (uint32_t)BATCH) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| |
| // Block-level time filter check |
| bool block_all_pass = false; |
| if (filter != nullptr) { |
| int64_t block_min, block_max; |
| int block_count; |
| if (time_decoder_->peek_next_block_range_int64( |
| time_in, block_min, block_max, block_count)) { |
| if (!filter->satisfy_start_end_time(block_min, block_max)) { |
| int skipped = 0; |
| time_decoder_->skip_peeked_block_int64(time_in, skipped); |
| int nonnull = 0; |
| for (int i = 0; i < block_count; ++i) { |
| int vi = cur_value_index + 1 + i; |
| if (!value_page_col_notnull_bitmap_.empty() && |
| ((value_page_col_notnull_bitmap_[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) != 0) { |
| ++nonnull; |
| } |
| } |
| cur_value_index += block_count; |
| if (nonnull > 0) { |
| int sk = 0; |
| value_decoder_->skip_float(nonnull, sk, value_in); |
| } |
| continue; |
| } |
| if (filter->contain_start_end_time(block_min, block_max)) { |
| block_all_pass = true; |
| } |
| } |
| } |
| |
| int time_count = 0; |
| if (RET_FAIL(time_decoder_->read_batch_int64( |
| times, BATCH, time_count, time_in))) { |
| break; |
| } |
| if (time_count == 0) break; |
| |
| bool is_null[BATCH]; |
| int nonnull_count = 0; |
| for (int i = 0; i < time_count; ++i) { |
| int vi = cur_value_index + 1 + i; |
| if (value_page_col_notnull_bitmap_.empty() || |
| ((value_page_col_notnull_bitmap_[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) == 0) { |
| is_null[i] = true; |
| } else { |
| is_null[i] = false; |
| ++nonnull_count; |
| } |
| } |
| |
| bool time_mask[BATCH]; |
| int pass_count = time_count; |
| if (filter != nullptr && !block_all_pass) { |
| pass_count = filter->satisfy_batch_time(times, time_count, |
| time_mask); |
| } |
| |
| if (pass_count == 0) { |
| if (nonnull_count > 0) { |
| int skipped = 0; |
| value_decoder_->skip_float(nonnull_count, skipped, value_in); |
| } |
| cur_value_index += time_count; |
| continue; |
| } |
| |
| int value_count = 0; |
| if (nonnull_count > 0) { |
| if (RET_FAIL(value_decoder_->read_batch_float( |
| values, nonnull_count, value_count, value_in))) { |
| break; |
| } |
| } |
| |
| int val_idx = 0; |
| for (int i = 0; i < time_count; ++i) { |
| cur_value_index++; |
| if (filter != nullptr && !block_all_pass && !time_mask[i]) { |
| if (!is_null[i]) ++val_idx; |
| continue; |
| } |
| if (is_null[i]) { |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| row_appender.append_null(1); |
| } else { |
| float val = values[val_idx++]; |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| row_appender.append(1, (char*)&val, sizeof(float)); |
| } |
| } |
| if (ret != E_OK) break; |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::double_DECODE_TV_BATCH(ByteStream& time_in, |
| ByteStream& value_in, |
| RowAppender& row_appender, |
| Filter* filter) { |
| int ret = E_OK; |
| const int BATCH = 129; |
| int64_t times[BATCH]; |
| double values[BATCH]; |
| const uint32_t null_mask_base = 1 << 7; |
| |
| while (time_decoder_->has_remaining(time_in)) { |
| if (row_appender.remaining() < (uint32_t)BATCH) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| |
| // Block-level time filter check |
| bool block_all_pass = false; |
| if (filter != nullptr) { |
| int64_t block_min, block_max; |
| int block_count; |
| if (time_decoder_->peek_next_block_range_int64( |
| time_in, block_min, block_max, block_count)) { |
| if (!filter->satisfy_start_end_time(block_min, block_max)) { |
| int skipped = 0; |
| time_decoder_->skip_peeked_block_int64(time_in, skipped); |
| int nonnull = 0; |
| for (int i = 0; i < block_count; ++i) { |
| int vi = cur_value_index + 1 + i; |
| if (!value_page_col_notnull_bitmap_.empty() && |
| ((value_page_col_notnull_bitmap_[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) != 0) { |
| ++nonnull; |
| } |
| } |
| cur_value_index += block_count; |
| if (nonnull > 0) { |
| int sk = 0; |
| value_decoder_->skip_double(nonnull, sk, value_in); |
| } |
| continue; |
| } |
| if (filter->contain_start_end_time(block_min, block_max)) { |
| block_all_pass = true; |
| } |
| } |
| } |
| |
| int time_count = 0; |
| if (RET_FAIL(time_decoder_->read_batch_int64( |
| times, BATCH, time_count, time_in))) { |
| break; |
| } |
| if (time_count == 0) break; |
| |
| bool is_null[BATCH]; |
| int nonnull_count = 0; |
| for (int i = 0; i < time_count; ++i) { |
| int vi = cur_value_index + 1 + i; |
| if (value_page_col_notnull_bitmap_.empty() || |
| ((value_page_col_notnull_bitmap_[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) == 0) { |
| is_null[i] = true; |
| } else { |
| is_null[i] = false; |
| ++nonnull_count; |
| } |
| } |
| |
| bool time_mask[BATCH]; |
| int pass_count = time_count; |
| if (filter != nullptr && !block_all_pass) { |
| pass_count = filter->satisfy_batch_time(times, time_count, |
| time_mask); |
| } |
| |
| if (pass_count == 0) { |
| if (nonnull_count > 0) { |
| int skipped = 0; |
| value_decoder_->skip_double(nonnull_count, skipped, value_in); |
| } |
| cur_value_index += time_count; |
| continue; |
| } |
| |
| int value_count = 0; |
| if (nonnull_count > 0) { |
| if (RET_FAIL(value_decoder_->read_batch_double( |
| values, nonnull_count, value_count, value_in))) { |
| break; |
| } |
| } |
| |
| int val_idx = 0; |
| for (int i = 0; i < time_count; ++i) { |
| cur_value_index++; |
| if (filter != nullptr && !block_all_pass && !time_mask[i]) { |
| if (!is_null[i]) ++val_idx; |
| continue; |
| } |
| if (is_null[i]) { |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| row_appender.append_null(1); |
| } else { |
| double val = values[val_idx++]; |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| row_appender.append(1, (char*)&val, sizeof(double)); |
| } |
| } |
| if (ret != E_OK) break; |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype( |
| ByteStream& time_in, ByteStream& value_in, TsBlock* ret_tsblock, |
| Filter* filter, common::PageArena* pa) { |
| int ret = E_OK; |
| RowAppender row_appender(ret_tsblock); |
| switch (value_chunk_header_.data_type_) { |
| case common::BOOLEAN: |
| DECODE_TYPED_TV_INTO_TSBLOCK(bool, boolean, time_in_, value_in_, |
| row_appender); |
| break; |
| case common::DATE: |
| case common::INT32: |
| ret = i32_DECODE_TV_BATCH(time_in_, value_in_, |
| row_appender, filter); |
| break; |
| case common::TIMESTAMP: |
| case common::INT64: |
| ret = i64_DECODE_TV_BATCH(time_in_, value_in_, |
| row_appender, filter); |
| break; |
| case common::FLOAT: |
| ret = float_DECODE_TV_BATCH(time_in_, value_in_, |
| row_appender, filter); |
| break; |
| case common::DOUBLE: |
| ret = double_DECODE_TV_BATCH(time_in_, value_in_, |
| row_appender, filter); |
| break; |
| case common::STRING: |
| case common::BLOB: |
| case common::TEXT: |
| ret = STRING_DECODE_TYPED_TV_INTO_TSBLOCK( |
| time_in, value_in, row_appender, *pa, filter); |
| break; |
| default: |
| ret = E_NOT_SUPPORT; |
| ASSERT(false); |
| } |
| if (ret_tsblock->get_row_count() == 0 && ret == E_OK) { |
| ret = E_NO_MORE_DATA; |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK( |
| ByteStream& time_in, ByteStream& value_in, RowAppender& row_appender, |
| PageArena& pa, Filter* filter) { |
| int ret = E_OK; |
| int64_t time = 0; |
| common::String value; |
| uint32_t mask = 1 << 7; |
| while (time_decoder_->has_remaining(time_in)) { |
| cur_value_index++; |
| bool should_read_data = true; |
| if (value_page_col_notnull_bitmap_.empty() || |
| ((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) & |
| (mask >> (cur_value_index % 8))) == 0) { |
| should_read_data = false; |
| } |
| |
| if (should_read_data) { |
| assert(value_decoder_->has_remaining(value_in)); |
| if (!value_decoder_->has_remaining(value_in)) { |
| return E_DATA_INCONSISTENCY; |
| } |
| } |
| |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| cur_value_index--; |
| break; |
| } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { |
| } else if (should_read_data && |
| RET_FAIL(value_decoder_->read_String(value, pa, value_in))) { |
| } else if (filter != nullptr && !filter->satisfy(time, value)) { |
| row_appender.backoff_add_row(); |
| continue; |
| } else { |
| row_appender.append(0, (char*)&time, sizeof(time)); |
| if (!should_read_data) { |
| row_appender.append_null(1); |
| } else { |
| row_appender.append(1, value.buf_, value.len_); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| // ══════════════════════════════════════════════════════════════════════════ |
| // Multi-value AlignedChunkReader implementation |
| // ══════════════════════════════════════════════════════════════════════════ |
| |
| int AlignedChunkReader::load_by_aligned_meta_multi( |
| ChunkMeta* time_chunk_meta, |
| const std::vector<ChunkMeta*>& value_metas) { |
| int ret = E_OK; |
| multi_value_mode_ = true; |
| time_chunk_meta_ = time_chunk_meta; |
| |
| // ── Load time chunk header ── |
| file_data_time_buf_size_ = 1024; |
| int32_t ret_read_len = 0; |
| char* time_file_data_buf = |
| (char*)mem_alloc(file_data_time_buf_size_, MOD_CHUNK_READER); |
| if (IS_NULL(time_file_data_buf)) return E_OOM; |
| |
| ret = read_file_->read(time_chunk_meta_->offset_of_chunk_header_, |
| time_file_data_buf, file_data_time_buf_size_, |
| ret_read_len); |
| if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { |
| ret = E_TSFILE_CORRUPTED; |
| mem_free(time_file_data_buf); |
| return ret; |
| } |
| if (IS_SUCC(ret)) { |
| time_in_stream_.wrap_from(time_file_data_buf, ret_read_len); |
| if (RET_FAIL(time_chunk_header_.deserialize_from(time_in_stream_))) { |
| return ret; |
| } |
| time_chunk_visit_offset_ = time_in_stream_.read_pos(); |
| } |
| |
| // Alloc time decoder/compressor |
| if (IS_SUCC(ret)) { |
| if (RET_FAIL(alloc_compressor_and_decoder( |
| time_decoder_, time_compressor_, |
| time_chunk_header_.encoding_type_, |
| time_chunk_header_.data_type_, |
| time_chunk_header_.compression_type_))) { |
| return ret; |
| } |
| } |
| |
| // ── Load each value column ── |
| // Reuse existing ValueColumnState objects if count matches (reset() already |
| // cleared their internal state). Otherwise, recreate. |
| if (value_columns_.size() != value_metas.size()) { |
| for (auto* p : value_columns_) delete p; |
| value_columns_.clear(); |
| value_columns_.reserve(value_metas.size()); |
| for (size_t c = 0; c < value_metas.size(); c++) { |
| value_columns_.push_back(new ValueColumnState); |
| } |
| } |
| for (size_t c = 0; c < value_metas.size() && IS_SUCC(ret); c++) { |
| auto* col = value_columns_[c]; |
| col->chunk_meta = value_metas[c]; |
| col->file_data_buf_size = 1024; |
| ret_read_len = 0; |
| char* vbuf = |
| (char*)mem_alloc(col->file_data_buf_size, MOD_CHUNK_READER); |
| if (IS_NULL(vbuf)) return E_OOM; |
| |
| ret = read_file_->read(col->chunk_meta->offset_of_chunk_header_, vbuf, |
| col->file_data_buf_size, ret_read_len); |
| if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) { |
| ret = E_TSFILE_CORRUPTED; |
| mem_free(vbuf); |
| break; |
| } |
| if (IS_SUCC(ret)) { |
| col->in_stream.wrap_from(vbuf, ret_read_len); |
| if (RET_FAIL( |
| col->chunk_header.deserialize_from(col->in_stream))) { |
| break; |
| } |
| col->chunk_visit_offset = col->in_stream.read_pos(); |
| if (RET_FAIL(alloc_compressor_and_decoder( |
| col->decoder, col->compressor, |
| col->chunk_header.encoding_type_, |
| col->chunk_header.data_type_, |
| col->chunk_header.compression_type_))) { |
| break; |
| } |
| } |
| } |
| |
| // Create thread pool for parallel decode when we have multiple columns. |
| if (IS_SUCC(ret) && value_columns_.size() > 1 && |
| decode_pool_ == nullptr) { |
| int nthreads = std::min((int)value_columns_.size(), 4); |
| decode_pool_ = new common::DecodeThreadPool(nthreads); |
| } |
| |
| return ret; |
| } |
| |
| bool AlignedChunkReader::has_more_data_multi() const { |
| if (prev_time_page_not_finish() || prev_any_value_page_not_finish_multi()) { |
| return true; |
| } |
| if (time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ < |
| time_chunk_header_.data_size_) { |
| return true; |
| } |
| for (const auto* col : value_columns_) { |
| if (col->chunk_visit_offset - col->chunk_header.serialized_size_ < |
| col->chunk_header.data_size_) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| bool AlignedChunkReader::prev_any_value_page_not_finish_multi() const { |
| for (const auto* col : value_columns_) { |
| if ((col->decoder && col->decoder->has_remaining(col->in)) || |
| col->in.has_remaining()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| int AlignedChunkReader::get_next_page_multi(TsBlock* ret_tsblock, |
| Filter* oneshoot_filter, |
| PageArena& pa) { |
| int ret = E_OK; |
| Filter* filter = |
| (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_); |
| |
| bool pt = prev_time_page_not_finish(); |
| bool pv = prev_any_value_page_not_finish_multi(); |
| if (pt && pv) { |
| ret = decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, |
| &pa); |
| return ret; |
| } |
| if (!pt && !pv) { |
| while (IS_SUCC(ret)) { |
| // Get time page header |
| if (RET_FAIL(get_cur_page_header( |
| time_chunk_meta_, time_in_stream_, cur_time_page_header_, |
| time_chunk_visit_offset_, time_chunk_header_))) { |
| break; |
| } |
| // Get each value column's page header |
| for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); |
| c++) { |
| auto* col = value_columns_[c]; |
| if (RET_FAIL(get_cur_page_header( |
| col->chunk_meta, col->in_stream, col->cur_page_header, |
| col->chunk_visit_offset, col->chunk_header, |
| &col->file_data_buf_size))) { |
| } |
| } |
| if (IS_FAIL(ret)) break; |
| |
| if (cur_page_statisify_filter_multi(filter)) { |
| break; |
| } |
| if (RET_FAIL(skip_cur_page_multi())) break; |
| if (!has_more_data()) { |
| ret = E_NO_MORE_DATA; |
| break; |
| } |
| } |
| if (IS_SUCC(ret)) { |
| ret = decode_cur_time_page_data(); |
| if (IS_SUCC(ret)) { |
| ret = decode_cur_value_pages_multi(); |
| } |
| } |
| } |
| if (IS_SUCC(ret)) { |
| ret = decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, |
| &pa); |
| } |
| return ret; |
| } |
| |
| bool AlignedChunkReader::cur_page_statisify_filter_multi(Filter* filter) { |
| bool time_satisfy = filter == nullptr || |
| cur_time_page_header_.statistic_ == nullptr || |
| filter->satisfy(cur_time_page_header_.statistic_); |
| return time_satisfy; |
| } |
| |
| int AlignedChunkReader::skip_cur_page_multi() { |
| time_chunk_visit_offset_ += cur_time_page_header_.compressed_size_; |
| time_in_stream_.wrapped_buf_advance_read_pos( |
| cur_time_page_header_.compressed_size_); |
| for (auto* col : value_columns_) { |
| col->chunk_visit_offset += col->cur_page_header.compressed_size_; |
| col->in_stream.wrapped_buf_advance_read_pos( |
| col->cur_page_header.compressed_size_); |
| } |
| return E_OK; |
| } |
| |
| int AlignedChunkReader::decode_cur_value_pages_multi() { |
| int ret = E_OK; |
| // Phase 1: Serial IO — ensure each column's page data is in memory. |
| for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); c++) { |
| ret = ensure_value_page_loaded(*value_columns_[c]); |
| } |
| if (IS_FAIL(ret)) return ret; |
| |
| // Phase 2: Parallel CPU — decompress + parse bitmap + reset decoder. |
| if (value_columns_.size() > 1 && decode_pool_ != nullptr) { |
| std::vector<int> col_rets(value_columns_.size(), E_OK); |
| for (size_t c = 0; c < value_columns_.size(); c++) { |
| auto* col = value_columns_[c]; |
| int* col_ret = &col_rets[c]; |
| decode_pool_->submit([col, col_ret] { |
| *col_ret = decompress_and_parse_value_page(*col); |
| }); |
| } |
| decode_pool_->wait_all(); |
| for (size_t c = 0; c < col_rets.size(); c++) { |
| if (IS_FAIL(col_rets[c])) return col_rets[c]; |
| } |
| } else { |
| for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); c++) { |
| ret = decompress_and_parse_value_page(*value_columns_[c]); |
| } |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::decode_cur_value_page_data_for(ValueColumnState& col) { |
| int ret = E_OK; |
| |
| // Step 1: ensure full page data is loaded |
| if (col.in_stream.remaining_size() < |
| col.cur_page_header.compressed_size_) { |
| if (RET_FAIL(read_from_file_and_rewrap( |
| col.in_stream, col.chunk_meta, col.chunk_visit_offset, |
| col.file_data_buf_size, |
| col.cur_page_header.compressed_size_))) { |
| return ret; |
| } |
| } |
| |
| if (col.cur_page_header.compressed_size_ == 0) { |
| col.in.wrap_from(nullptr, 0); |
| return E_OK; |
| } |
| |
| // Step 2: uncompress |
| char* compressed_buf = |
| col.in_stream.get_wrapped_buf() + col.in_stream.read_pos(); |
| uint32_t compressed_size = col.cur_page_header.compressed_size_; |
| col.in_stream.wrapped_buf_advance_read_pos(compressed_size); |
| col.chunk_visit_offset += compressed_size; |
| |
| char* uncompressed_buf = nullptr; |
| uint32_t uncompressed_size = 0; |
| if (RET_FAIL(col.compressor->reset(false))) { |
| return ret; |
| } |
| if (RET_FAIL(col.compressor->uncompress(compressed_buf, compressed_size, |
| uncompressed_buf, |
| uncompressed_size))) { |
| return ret; |
| } |
| col.uncompressed_buf = uncompressed_buf; |
| |
| if (uncompressed_size != col.cur_page_header.uncompressed_size_) { |
| return E_TSFILE_CORRUPTED; |
| } |
| |
| // Step 3: parse bitmap + value data |
| uint32_t offset = 0; |
| uint32_t data_num = SerializationUtil::read_ui32(uncompressed_buf); |
| offset += sizeof(uint32_t); |
| col.notnull_bitmap.resize((data_num + 7) / 8); |
| for (size_t i = 0; i < col.notnull_bitmap.size(); i++) { |
| col.notnull_bitmap[i] = *(uncompressed_buf + offset); |
| offset++; |
| } |
| col.cur_value_index = -1; |
| |
| char* value_buf = uncompressed_buf + offset; |
| uint32_t value_buf_size = uncompressed_size - offset; |
| col.decoder->reset(); |
| col.in.wrap_from(value_buf, value_buf_size); |
| return ret; |
| } |
| |
| int AlignedChunkReader::ensure_value_page_loaded(ValueColumnState& col) { |
| int ret = E_OK; |
| if (col.in_stream.remaining_size() < |
| col.cur_page_header.compressed_size_) { |
| if (RET_FAIL(read_from_file_and_rewrap( |
| col.in_stream, col.chunk_meta, col.chunk_visit_offset, |
| col.file_data_buf_size, |
| col.cur_page_header.compressed_size_))) { |
| return ret; |
| } |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::decompress_and_parse_value_page( |
| ValueColumnState& col) { |
| int ret = E_OK; |
| |
| if (col.cur_page_header.compressed_size_ == 0) { |
| col.in.wrap_from(nullptr, 0); |
| return E_OK; |
| } |
| |
| // Decompress |
| char* compressed_buf = |
| col.in_stream.get_wrapped_buf() + col.in_stream.read_pos(); |
| uint32_t compressed_size = col.cur_page_header.compressed_size_; |
| col.in_stream.wrapped_buf_advance_read_pos(compressed_size); |
| col.chunk_visit_offset += compressed_size; |
| |
| char* uncompressed_buf = nullptr; |
| uint32_t uncompressed_size = 0; |
| if (RET_FAIL(col.compressor->reset(false))) { |
| return ret; |
| } |
| if (RET_FAIL(col.compressor->uncompress(compressed_buf, compressed_size, |
| uncompressed_buf, |
| uncompressed_size))) { |
| return ret; |
| } |
| col.uncompressed_buf = uncompressed_buf; |
| |
| if (uncompressed_size != col.cur_page_header.uncompressed_size_) { |
| return E_TSFILE_CORRUPTED; |
| } |
| |
| // Parse bitmap + value data |
| uint32_t offset = 0; |
| uint32_t data_num = SerializationUtil::read_ui32(uncompressed_buf); |
| offset += sizeof(uint32_t); |
| col.notnull_bitmap.resize((data_num + 7) / 8); |
| for (size_t i = 0; i < col.notnull_bitmap.size(); i++) { |
| col.notnull_bitmap[i] = *(uncompressed_buf + offset); |
| offset++; |
| } |
| col.cur_value_index = -1; |
| |
| char* value_buf = uncompressed_buf + offset; |
| uint32_t value_buf_size = uncompressed_size - offset; |
| col.decoder->reset(); |
| col.in.wrap_from(value_buf, value_buf_size); |
| return ret; |
| } |
| |
| int AlignedChunkReader::decode_time_value_buf_into_tsblock_multi( |
| TsBlock*& ret_tsblock, Filter* filter, PageArena* pa) { |
| int ret = E_OK; |
| RowAppender row_appender(ret_tsblock); |
| ret = multi_DECODE_TV_BATCH(ret_tsblock, row_appender, filter, pa); |
| |
| // Release uncompressed buffers if pages are done |
| if (ret != E_OVERFLOW) { |
| if (time_uncompressed_buf_ != nullptr) { |
| time_compressor_->after_uncompress(time_uncompressed_buf_); |
| time_uncompressed_buf_ = nullptr; |
| } |
| for (auto* col : value_columns_) { |
| if (col->uncompressed_buf != nullptr) { |
| col->compressor->after_uncompress(col->uncompressed_buf); |
| col->uncompressed_buf = nullptr; |
| } |
| if (!(col->decoder && col->decoder->has_remaining(col->in)) && |
| !col->in.has_remaining()) { |
| col->in.reset(); |
| } |
| col->notnull_bitmap.clear(); |
| col->notnull_bitmap.shrink_to_fit(); |
| } |
| if (!prev_time_page_not_finish()) { |
| time_in_.reset(); |
| } |
| } else { |
| ret = E_OK; |
| } |
| return ret; |
| } |
| |
| int AlignedChunkReader::multi_DECODE_TV_BATCH(TsBlock* ret_tsblock, |
| RowAppender& row_appender, |
| Filter* filter, |
| PageArena* pa) { |
| int ret = E_OK; |
| const int BATCH = 129; |
| int64_t times[BATCH]; |
| const uint32_t null_mask_base = 1 << 7; |
| const uint32_t num_cols = value_columns_.size(); |
| |
| |
| while (time_decoder_->has_remaining(time_in_)) { |
| if (row_appender.remaining() < (uint32_t)BATCH) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| |
| // ── Phase 1: Decode a batch of timestamps ── |
| int time_count = 0; |
| if (RET_FAIL(time_decoder_->read_batch_int64(times, BATCH, time_count, |
| time_in_))) { |
| break; |
| } |
| if (time_count == 0) break; |
| |
| // ── Phase 2: Apply time filter ── |
| bool time_mask[BATCH]; |
| bool block_all_pass = (filter == nullptr); |
| int pass_count = time_count; |
| if (!block_all_pass) { |
| pass_count = |
| filter->satisfy_batch_time(times, time_count, time_mask); |
| } |
| |
| // ── Phase 3: Per-column null check + value decode ── |
| // For each column, compute null flags and decode non-null values. |
| // We store decoded values in column-specific buffers. |
| // Max 8 bytes per value, 129 values per batch. |
| struct ColBatch { |
| bool is_null[BATCH]; |
| int nonnull_count; |
| // Value buffer — up to 129 * 8 bytes = 1032 bytes on stack |
| char val_buf[BATCH * 8]; |
| int val_count; |
| }; |
| // Allocate on heap if many columns, stack for small counts |
| std::vector<ColBatch> col_batches(num_cols); |
| |
| for (uint32_t c = 0; c < num_cols; c++) { |
| auto* col = value_columns_[c]; |
| auto& cb = col_batches[c]; |
| cb.nonnull_count = 0; |
| cb.val_count = 0; |
| for (int i = 0; i < time_count; i++) { |
| int vi = col->cur_value_index + 1 + i; |
| if (col->notnull_bitmap.empty() || |
| ((col->notnull_bitmap[vi / 8] & 0xFF) & |
| (null_mask_base >> (vi % 8))) == 0) { |
| cb.is_null[i] = true; |
| } else { |
| cb.is_null[i] = false; |
| cb.nonnull_count++; |
| } |
| } |
| |
| // Skip values if no rows pass time filter |
| if (pass_count == 0 && cb.nonnull_count > 0) { |
| switch (col->chunk_header.data_type_) { |
| case common::INT32: |
| case common::DATE: { |
| int sk = 0; |
| col->decoder->skip_int32(cb.nonnull_count, sk, col->in); |
| break; |
| } |
| case common::INT64: |
| case common::TIMESTAMP: { |
| int sk = 0; |
| col->decoder->skip_int64(cb.nonnull_count, sk, col->in); |
| break; |
| } |
| case common::FLOAT: { |
| int sk = 0; |
| col->decoder->skip_float(cb.nonnull_count, sk, col->in); |
| break; |
| } |
| case common::DOUBLE: { |
| int sk = 0; |
| col->decoder->skip_double(cb.nonnull_count, sk, col->in); |
| break; |
| } |
| default: |
| // STRING etc - fall through to value decode |
| break; |
| } |
| cb.nonnull_count = 0; // already skipped |
| } |
| |
| // Decode non-null values |
| if (cb.nonnull_count > 0) { |
| switch (col->chunk_header.data_type_) { |
| case common::INT32: |
| case common::DATE: |
| col->decoder->read_batch_int32( |
| reinterpret_cast<int32_t*>(cb.val_buf), |
| cb.nonnull_count, cb.val_count, col->in); |
| break; |
| case common::INT64: |
| case common::TIMESTAMP: |
| col->decoder->read_batch_int64( |
| reinterpret_cast<int64_t*>(cb.val_buf), |
| cb.nonnull_count, cb.val_count, col->in); |
| break; |
| case common::FLOAT: |
| col->decoder->read_batch_float( |
| reinterpret_cast<float*>(cb.val_buf), |
| cb.nonnull_count, cb.val_count, col->in); |
| break; |
| case common::DOUBLE: |
| col->decoder->read_batch_double( |
| reinterpret_cast<double*>(cb.val_buf), |
| cb.nonnull_count, cb.val_count, col->in); |
| break; |
| default: |
| // STRING handled below in scatter loop |
| break; |
| } |
| } |
| } |
| |
| // ── Phase 4: Skip if no rows pass ── |
| if (pass_count == 0) { |
| for (uint32_t c = 0; c < num_cols; c++) { |
| value_columns_[c]->cur_value_index += time_count; |
| } |
| continue; |
| } |
| |
| // ── Phase 5: Scatter into TsBlock ── |
| |
| // Fast path: all rows pass filter AND all columns have no nulls |
| // → batch memcpy directly into Vector buffers. |
| if (pass_count == time_count) { |
| bool all_nonnull = true; |
| for (uint32_t c = 0; c < num_cols; c++) { |
| if (col_batches[c].nonnull_count != time_count) { |
| all_nonnull = false; |
| break; |
| } |
| } |
| if (all_nonnull) { |
| // Batch append time column |
| common::Vector* time_vec = ret_tsblock->get_vector(0); |
| time_vec->get_value_data().append_fixed_value( |
| (const char*)times, |
| static_cast<uint32_t>(time_count) * sizeof(int64_t)); |
| // Batch append each value column |
| for (uint32_t c = 0; c < num_cols; c++) { |
| auto& cb = col_batches[c]; |
| auto* col = value_columns_[c]; |
| uint32_t elem_size = common::get_data_type_size( |
| col->chunk_header.data_type_); |
| common::Vector* vec = ret_tsblock->get_vector(c + 1); |
| vec->get_value_data().append_fixed_value( |
| cb.val_buf, |
| static_cast<uint32_t>(cb.val_count) * elem_size); |
| col->cur_value_index += time_count; |
| } |
| row_appender.add_rows(static_cast<uint32_t>(time_count)); |
| continue; |
| } |
| } |
| |
| // Slow path: per-row scatter (has filter or has nulls) |
| std::vector<int> val_idx(num_cols, 0); |
| |
| for (int i = 0; i < time_count; i++) { |
| bool passes = block_all_pass || time_mask[i]; |
| |
| if (!passes) { |
| for (uint32_t c = 0; c < num_cols; c++) { |
| value_columns_[c]->cur_value_index++; |
| if (!col_batches[c].is_null[i]) val_idx[c]++; |
| } |
| continue; |
| } |
| |
| if (UNLIKELY(!row_appender.add_row())) { |
| ret = E_OVERFLOW; |
| break; |
| } |
| |
| row_appender.append(0, (char*)×[i], sizeof(int64_t)); |
| |
| for (uint32_t c = 0; c < num_cols; c++) { |
| value_columns_[c]->cur_value_index++; |
| auto& cb = col_batches[c]; |
| auto* col = value_columns_[c]; |
| |
| if (cb.is_null[i]) { |
| row_appender.append_null(c + 1); |
| } else { |
| uint32_t elem_size = |
| common::get_data_type_size(col->chunk_header.data_type_); |
| row_appender.append(c + 1, |
| cb.val_buf + val_idx[c] * elem_size, |
| elem_size); |
| val_idx[c]++; |
| } |
| } |
| } |
| if (ret != E_OK) break; |
| } |
| return ret; |
| } |
| |
| } // end namespace storage |