blob: 393e30be2533352d5ce83daea79bbedb66827a21 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef READER_CHUNK_ALIGNED_READER_H
#define READER_CHUNK_ALIGNED_READER_H
#include "common/allocator/my_string.h"
#include "common/tsfile_common.h"
#include "compress/compressor.h"
#include "encoding/decoder.h"
#include "file/read_file.h"
#include "reader/filter/filter.h"
#include "reader/ichunk_reader.h"
#ifdef ENABLE_THREADS
namespace common {
class ThreadPool;
}
#endif
namespace storage {
// Page classification for chunk-level parallel decode.
enum class PagePassType { SKIP, FULL_PASS, BOUNDARY };
// Metadata collected per page during the chunk scan phase.
struct ChunkPageInfo {
PagePassType pass_type = PagePassType::SKIP;
int64_t time_file_offset = 0;
uint32_t time_compressed_size = 0;
uint32_t time_uncompressed_size = 0;
std::vector<int64_t> value_file_offsets;
std::vector<uint32_t> value_compressed_sizes;
std::vector<uint32_t> value_uncompressed_sizes;
};
// Pre-decoded timestamps for one page (chunk-level decode).
struct PageTimesDecoded {
std::vector<int64_t> times;
int count = 0;
int cursor = 0;
};
// Pre-decoded values for one (column, page) pair (chunk-level decode).
struct ColPageDecoded {
std::vector<char> values;
std::vector<uint8_t> bitmap;
uint32_t data_num = 0;
int nonnull_count = 0;
int read_pos = 0;
char* uncompressed_buf = nullptr;
};
// Per-value-column state for multi-value AlignedChunkReader.
struct ValueColumnState {
ChunkMeta* chunk_meta = nullptr;
ChunkHeader chunk_header;
Decoder* decoder = nullptr;
Compressor* compressor = nullptr;
common::ByteStream in_stream;
common::ByteStream in;
char* uncompressed_buf = nullptr;
int32_t file_data_buf_size = 0;
uint32_t chunk_visit_offset = 0;
PageHeader cur_page_header;
std::vector<uint8_t> notnull_bitmap;
int32_t cur_value_index = -1;
std::vector<char> predecoded_values;
int predecoded_count = 0;
int predecoded_read_pos = 0;
bool predecoded = false;
};
class AlignedChunkReader : public IChunkReader {
public:
AlignedChunkReader()
: read_file_(nullptr),
time_chunk_meta_(nullptr),
value_chunk_meta_(nullptr),
measurement_name_(),
time_chunk_header_(),
value_chunk_header_(),
cur_time_page_header_(),
cur_value_page_header_(),
time_in_stream_(),
value_in_stream_(),
file_data_time_buf_size_(0),
file_data_value_buf_size_(0),
time_chunk_visit_offset_(0),
value_chunk_visit_offset_(0),
time_compressor_(nullptr),
value_compressor_(nullptr),
time_filter_(nullptr),
time_decoder_(nullptr),
value_decoder_(nullptr),
time_in_(),
value_in_(),
time_uncompressed_buf_(nullptr),
value_uncompressed_buf_(nullptr),
cur_value_index(-1) {}
int init(ReadFile* read_file, common::String m_name,
common::TSDataType data_type, Filter* time_filter) override;
void reset() override;
void destroy() override;
~AlignedChunkReader() override = default;
bool has_more_data() const override {
if (multi_value_mode_) {
return has_more_data_multi();
}
return prev_value_page_not_finish() || prev_time_page_not_finish() ||
(value_chunk_visit_offset_ -
value_chunk_header_.serialized_size_ <
value_chunk_header_.data_size_) ||
(time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ <
time_chunk_header_.data_size_);
}
ChunkHeader& get_chunk_header() override { return value_chunk_header_; }
int load_by_aligned_meta(ChunkMeta* time_meta,
ChunkMeta* value_meta) override;
// Multi-value: load one time chunk + N value chunks.
int load_by_aligned_meta_multi(ChunkMeta* time_meta,
const std::vector<ChunkMeta*>& value_metas);
int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
common::PageArena& pa) override;
int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
common::PageArena& pa, int64_t min_time_hint,
int& row_offset, int& row_limit) override;
uint32_t get_value_column_count() const {
return multi_value_mode_ ? value_columns_.size() : 1;
}
ChunkHeader& get_value_chunk_header(uint32_t col) {
if (multi_value_mode_ && col < value_columns_.size()) {
return value_columns_[col]->chunk_header;
}
return value_chunk_header_;
}
bool is_multi_value_mode() const { return multi_value_mode_; }
#ifdef ENABLE_THREADS
void set_decode_pool(common::ThreadPool* pool) { decode_pool_ = pool; }
#endif
private:
bool should_skip_page_by_time(int64_t min_time_hint);
bool should_skip_page_by_offset(int& row_offset);
FORCE_INLINE bool chunk_has_only_one_page(
const ChunkHeader& chunk_header) const {
return (chunk_header.chunk_type_ & ONLY_ONE_PAGE_CHUNK_HEADER_MARKER) ==
ONLY_ONE_PAGE_CHUNK_HEADER_MARKER;
}
int alloc_compressor_and_decoder(storage::Decoder*& decoder,
storage::Compressor*& compressor,
common::TSEncoding encoding,
common::TSDataType data_type,
common::CompressionType compression_type);
int get_cur_page_header(ChunkMeta*& chunk_meta,
common::ByteStream& in_stream_,
PageHeader& cur_page_header_,
uint32_t& chunk_visit_offset,
ChunkHeader& chunk_header,
int32_t* override_buf_size = nullptr);
int read_from_file_and_rewrap(common::ByteStream& in_stream_,
ChunkMeta*& chunk_meta,
uint32_t& chunk_visit_offset,
int32_t& file_data_buf_size,
int want_size = 0, bool may_shrink = true);
bool cur_page_statisify_filter(Filter* filter);
int skip_cur_page();
int decode_cur_time_page_data();
int decode_cur_value_page_data();
int decode_time_value_buf_into_tsblock(common::TsBlock*& ret_tsblock,
Filter* filter,
common::PageArena* pa);
bool prev_time_page_not_finish() const {
if (time_predecoded_) return page_time_cursor_ < page_time_count_;
return (time_decoder_ && time_decoder_->has_remaining(time_in_)) ||
time_in_.has_remaining();
}
bool prev_value_page_not_finish() const {
return (value_decoder_ && value_decoder_->has_remaining(value_in_)) ||
value_in_.has_remaining();
}
int decode_tv_buf_into_tsblock_by_datatype(common::ByteStream& time_in,
common::ByteStream& value_in,
common::TsBlock* ret_tsblock,
Filter* filter,
common::PageArena* pa);
int i32_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream& time_in,
common::ByteStream& value_in,
common::RowAppender& row_appender,
Filter* filter);
int STRING_DECODE_TYPED_TV_INTO_TSBLOCK(common::ByteStream& time_in,
common::ByteStream& value_in,
common::RowAppender& row_appender,
common::PageArena& pa,
Filter* filter);
// ── Multi-value private methods ──────────────────────────────────────
bool has_more_data_multi() const;
bool prev_any_value_page_not_finish_multi() const;
int get_next_page_multi(common::TsBlock* ret_tsblock,
Filter* oneshoot_filter, common::PageArena& pa);
int get_next_page_multi_serial(common::TsBlock* ret_tsblock, Filter* filter,
common::PageArena& pa);
int skip_cur_page_multi();
bool cur_page_statisify_filter_multi(Filter* filter);
int decode_cur_value_pages_multi();
int decode_cur_value_page_data_for(ValueColumnState& col);
int ensure_value_page_loaded(ValueColumnState& col);
static int decompress_and_parse_value_page(ValueColumnState& col);
void predecode_all_timestamps();
int decode_time_value_buf_into_tsblock_multi(common::TsBlock*& ret_tsblock,
Filter* filter,
common::PageArena* pa);
int multi_DECODE_TV_BATCH(common::TsBlock* ret_tsblock,
common::RowAppender& row_appender, Filter* filter,
common::PageArena* pa);
// ── Chunk-level parallel decode methods ─────────────────────────────
int scan_chunk_pages(Filter* filter);
int decode_chunk_pages();
int scatter_chunk_pages(common::TsBlock* tsblock,
common::RowAppender& row_appender, Filter* filter,
common::PageArena* pa);
void cleanup_chunk_decode();
private:
ReadFile* read_file_;
// ── Single-value mode fields ─────────────────────────────────────────
ChunkMeta* time_chunk_meta_;
ChunkMeta* value_chunk_meta_;
common::String measurement_name_;
ChunkHeader time_chunk_header_;
ChunkHeader value_chunk_header_;
PageHeader cur_time_page_header_;
PageHeader cur_value_page_header_;
common::ByteStream time_in_stream_;
common::ByteStream value_in_stream_;
int32_t file_data_time_buf_size_;
int32_t file_data_value_buf_size_;
uint32_t time_chunk_visit_offset_;
uint32_t value_chunk_visit_offset_;
Compressor* time_compressor_;
Compressor* value_compressor_;
Filter* time_filter_;
Decoder* time_decoder_;
Decoder* value_decoder_;
common::ByteStream time_in_;
common::ByteStream value_in_;
char* time_uncompressed_buf_;
char* value_uncompressed_buf_;
std::vector<uint8_t> value_page_col_notnull_bitmap_;
uint32_t value_page_data_num_;
int32_t cur_value_index;
// ── Multi-value mode fields ──────────────────────────────────────────
bool multi_value_mode_ = false;
std::vector<ValueColumnState*> value_columns_;
// Pre-decoded timestamps for page-level parallel decode.
std::vector<int64_t> page_all_times_;
int page_time_count_ = 0;
int page_time_cursor_ = 0;
bool time_predecoded_ = false;
// ── Chunk-level parallel decode state ────────────────────────────────
std::vector<ChunkPageInfo> chunk_pages_;
std::vector<PageTimesDecoded> chunk_times_;
std::vector<std::vector<ColPageDecoded>> chunk_cols_;
int chunk_page_cursor_ = 0;
bool chunk_level_active_ = false;
#ifdef ENABLE_THREADS
common::ThreadPool* decode_pool_ = nullptr; // borrowed, not owned
#endif
};
} // end namespace storage
#endif // READER_CHUNK_ALIGNED_READER_H