blob: 06dd72eb7b2f5cc8eba01f8fba5db4c3d800b95e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vparquet_group_reader.h"
#include <gen_cpp/Exprs_types.h>
#include <gen_cpp/Opcodes_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/parquet_types.h>
#include <string.h>
#include <algorithm>
#include <boost/iterator/iterator_facade.hpp>
#include <ostream>
#include "common/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "exprs/create_predicate_function.h"
#include "exprs/hybrid_set.h"
#include "gutil/stringprintf.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "schema_desc.h"
#include "util/simd/bits.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
#include "vec/common/pod_array.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
#include "vec/data_types/data_type_string.h"
#include "vec/exprs/vdirect_in_predicate.h"
#include "vec/exprs/vectorized_fn_call.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vliteral.h"
#include "vec/exprs/vslot_ref.h"
#include "vparquet_column_reader.h"
namespace cctz {
class time_zone;
} // namespace cctz
namespace doris {
class RuntimeState;
namespace io {
class IOContext;
} // namespace io
} // namespace doris
namespace doris::vectorized {
const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
RowGroupReader::RowGroupReader(io::FileReaderSPtr file_reader,
const std::vector<ParquetReadColumn>& read_columns,
const int32_t row_group_id, const tparquet::RowGroup& row_group,
cctz::time_zone* ctz, io::IOContext* io_ctx,
const PositionDeleteContext& position_delete_ctx,
const LazyReadContext& lazy_read_ctx, RuntimeState* state)
: _file_reader(file_reader),
_read_columns(read_columns),
_row_group_id(row_group_id),
_row_group_meta(row_group),
_remaining_rows(row_group.num_rows),
_ctz(ctz),
_io_ctx(io_ctx),
_position_delete_ctx(position_delete_ctx),
_lazy_read_ctx(lazy_read_ctx),
_state(state),
_obj_pool(new ObjectPool()) {}
RowGroupReader::~RowGroupReader() {
_column_readers.clear();
for (auto& ctx : _dict_filter_conjuncts) {
if (ctx) {
ctx->close(_state);
}
}
_obj_pool->clear();
}
Status RowGroupReader::init(
const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const std::unordered_map<std::string, int>* colname_to_slot_id,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts) {
_tuple_descriptor = tuple_descriptor;
_row_descriptor = row_descriptor;
_col_name_to_slot_id = colname_to_slot_id;
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_text_converter.reset(new TextConverter('\\'));
if (not_single_slot_filter_conjuncts) {
_filter_conjuncts.insert(_filter_conjuncts.end(), not_single_slot_filter_conjuncts->begin(),
not_single_slot_filter_conjuncts->end());
}
_merge_read_ranges(row_ranges);
if (_read_columns.empty()) {
// Query task that only select columns in path.
return Status::OK();
}
const size_t MAX_GROUP_BUF_SIZE = config::parquet_rowgroup_max_buffer_mb << 20;
const size_t MAX_COLUMN_BUF_SIZE = config::parquet_column_max_buffer_mb << 20;
size_t max_buf_size = std::min(MAX_COLUMN_BUF_SIZE, MAX_GROUP_BUF_SIZE / _read_columns.size());
for (auto& read_col : _read_columns) {
auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
std::unique_ptr<ParquetColumnReader> reader;
RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta,
_read_ranges, _ctz, _io_ctx, reader,
max_buf_size));
auto col_iter = col_offsets.find(read_col._parquet_col_id);
if (col_iter != col_offsets.end()) {
tparquet::OffsetIndex oi = col_iter->second;
reader->add_offset_index(&oi);
}
if (reader == nullptr) {
VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
return Status::Corruption("Init row group reader failed");
}
_column_readers[read_col._file_slot_name] = std::move(reader);
}
// Check if single slot can be filtered by dict.
if (!_slot_id_to_filter_conjuncts) {
return Status::OK();
}
const std::vector<string>& predicate_col_names = _lazy_read_ctx.predicate_columns.first;
const std::vector<int>& predicate_col_slot_ids = _lazy_read_ctx.predicate_columns.second;
for (size_t i = 0; i < predicate_col_names.size(); ++i) {
const string& predicate_col_name = predicate_col_names[i];
int slot_id = predicate_col_slot_ids[i];
auto field = const_cast<FieldSchema*>(schema.get_column(predicate_col_name));
if (_can_filter_by_dict(slot_id,
_row_group_meta.columns[field->physical_column_index].meta_data)) {
_dict_filter_cols.emplace_back(std::make_pair(predicate_col_name, slot_id));
} else {
if (_slot_id_to_filter_conjuncts->find(slot_id) !=
_slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
_filter_conjuncts.push_back(ctx);
}
}
}
}
// Add predicate_partition_columns in _slot_id_to_filter_conjuncts(single slot conjuncts)
// to _filter_conjuncts, others should be added from not_single_slot_filter_conjuncts.
for (auto& kv : _lazy_read_ctx.predicate_partition_columns) {
auto& [value, slot_desc] = kv.second;
auto iter = _slot_id_to_filter_conjuncts->find(slot_desc->id());
if (iter != _slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : iter->second) {
_filter_conjuncts.push_back(ctx);
}
}
}
RETURN_IF_ERROR(_rewrite_dict_predicates());
return Status::OK();
}
bool RowGroupReader::_can_filter_by_dict(int slot_id,
const tparquet::ColumnMetaData& column_metadata) {
SlotDescriptor* slot = nullptr;
const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
for (auto each : slots) {
if (each->id() == slot_id) {
slot = each;
break;
}
}
if (!slot->type().is_string_type()) {
return false;
}
if (_slot_id_to_filter_conjuncts->find(slot_id) == _slot_id_to_filter_conjuncts->end()) {
return false;
}
if (!is_dictionary_encoded(column_metadata)) {
return false;
}
// TODO:check expr like 'a > 10 is null', 'a > 10' should can be filter by dict.
for (auto& ctx : _slot_id_to_filter_conjuncts->at(slot_id)) {
const auto& root_expr = ctx->root();
if (root_expr->node_type() == TExprNodeType::FUNCTION_CALL) {
std::string is_null_str;
std::string function_name = root_expr->fn().name.function_name;
if (function_name.compare("is_null_pred") == 0 ||
function_name.compare("is_not_null_pred") == 0) {
return false;
}
}
}
return true;
}
// This function is copied from
// https://github.com/apache/impala/blob/master/be/src/exec/parquet/hdfs-parquet-scanner.cc#L1717
bool RowGroupReader::is_dictionary_encoded(const tparquet::ColumnMetaData& column_metadata) {
// The Parquet spec allows for column chunks to have mixed encodings
// where some data pages are dictionary-encoded and others are plain
// encoded. For example, a Parquet file writer might start writing
// a column chunk as dictionary encoded, but it will switch to plain
// encoding if the dictionary grows too large.
//
// In order for dictionary filters to skip the entire row group,
// the conjuncts must be evaluated on column chunks that are entirely
// encoded with the dictionary encoding. There are two checks
// available to verify this:
// 1. The encoding_stats field on the column chunk metadata provides
// information about the number of data pages written in each
// format. This allows for a specific check of whether all the
// data pages are dictionary encoded.
// 2. The encodings field on the column chunk metadata lists the
// encodings used. If this list contains the dictionary encoding
// and does not include unexpected encodings (i.e. encodings not
// associated with definition/repetition levels), then it is entirely
// dictionary encoded.
if (column_metadata.__isset.encoding_stats) {
// Condition #1 above
for (const tparquet::PageEncodingStats& enc_stat : column_metadata.encoding_stats) {
if (enc_stat.page_type == tparquet::PageType::DATA_PAGE &&
(enc_stat.encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
enc_stat.encoding != tparquet::Encoding::RLE_DICTIONARY) &&
enc_stat.count > 0) {
return false;
}
}
} else {
// Condition #2 above
bool has_dict_encoding = false;
bool has_nondict_encoding = false;
for (const tparquet::Encoding::type& encoding : column_metadata.encodings) {
if (encoding == tparquet::Encoding::PLAIN_DICTIONARY ||
encoding == tparquet::Encoding::RLE_DICTIONARY) {
has_dict_encoding = true;
}
// RLE and BIT_PACKED are used for repetition/definition levels
if (encoding != tparquet::Encoding::PLAIN_DICTIONARY &&
encoding != tparquet::Encoding::RLE_DICTIONARY &&
encoding != tparquet::Encoding::RLE && encoding != tparquet::Encoding::BIT_PACKED) {
has_nondict_encoding = true;
break;
}
}
// Not entirely dictionary encoded if:
// 1. No dictionary encoding listed
// OR
// 2. Some non-dictionary encoding is listed
if (!has_dict_encoding || has_nondict_encoding) {
return false;
}
}
return true;
}
Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
bool* batch_eof) {
if (_is_row_group_filtered) {
*read_rows = 0;
*batch_eof = true;
return Status::OK();
}
// Process external table query task that select columns are all from path.
if (_read_columns.empty()) {
RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
RETURN_IF_ERROR(
_fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
Status st = VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, block->columns());
*read_rows = block->rows();
return st;
}
if (_lazy_read_ctx.can_lazy_read) {
// call _do_lazy_read recursively when current batch is skipped
return _do_lazy_read(block, batch_size, read_rows, batch_eof);
} else {
ColumnSelectVector run_length_vector;
RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size,
read_rows, batch_eof, run_length_vector));
RETURN_IF_ERROR(
_fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
if (block->rows() == 0) {
_convert_dict_cols_to_string_cols(block);
*read_rows = block->rows();
return Status::OK();
}
RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
if (!_lazy_read_ctx.conjuncts.empty()) {
std::vector<IColumn::Filter*> filters;
if (_position_delete_ctx.has_filter) {
filters.push_back(_pos_delete_filter_ptr.get());
}
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
_filter_conjuncts, &filters, block, columns_to_filter,
column_to_keep)));
_convert_dict_cols_to_string_cols(block);
} else {
RETURN_IF_CATCH_EXCEPTION(
RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
}
*read_rows = block->rows();
return Status::OK();
}
}
void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) {
_read_ranges = row_ranges;
}
Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns,
size_t batch_size, size_t* read_rows, bool* batch_eof,
ColumnSelectVector& select_vector) {
size_t batch_read_rows = 0;
bool has_eof = false;
for (auto& read_col_name : columns) {
auto& column_with_type_and_name = block->get_by_name(read_col_name);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
bool is_dict_filter = false;
for (auto& _dict_filter_col : _dict_filter_cols) {
if (_dict_filter_col.first == read_col_name) {
MutableColumnPtr dict_column = ColumnVector<Int32>::create();
size_t pos = block->get_position_by_name(read_col_name);
if (column_type->is_nullable()) {
block->get_by_position(pos).type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt32>());
block->replace_by_position(
pos,
ColumnNullable::create(std::move(dict_column),
ColumnUInt8::create(dict_column->size(), 0)));
} else {
block->get_by_position(pos).type = std::make_shared<DataTypeInt32>();
block->replace_by_position(pos, std::move(dict_column));
}
is_dict_filter = true;
break;
}
}
size_t col_read_rows = 0;
bool col_eof = false;
// Should reset _filter_map_index to 0 when reading next column.
select_vector.reset();
while (!col_eof && col_read_rows < batch_size) {
size_t loop_rows = 0;
RETURN_IF_ERROR(_column_readers[read_col_name]->read_column_data(
column_ptr, column_type, select_vector, batch_size - col_read_rows, &loop_rows,
&col_eof, is_dict_filter));
col_read_rows += loop_rows;
}
if (batch_read_rows > 0 && batch_read_rows != col_read_rows) {
return Status::Corruption("Can't read the same number of rows among parquet columns");
}
batch_read_rows = col_read_rows;
if (col_eof) {
has_eof = true;
}
}
*read_rows = batch_read_rows;
*batch_eof = has_eof;
return Status::OK();
}
Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows,
bool* batch_eof) {
std::unique_ptr<ColumnSelectVector> select_vector_ptr = nullptr;
size_t pre_read_rows;
bool pre_eof;
size_t origin_column_num = block->columns();
IColumn::Filter result_filter;
while (true) {
// read predicate columns
pre_read_rows = 0;
pre_eof = false;
ColumnSelectVector run_length_vector;
RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns.first, batch_size,
&pre_read_rows, &pre_eof, run_length_vector));
if (pre_read_rows == 0) {
DCHECK_EQ(pre_eof, true);
break;
}
RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
_lazy_read_ctx.predicate_partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
_lazy_read_ctx.predicate_missing_columns));
RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
// generate filter vector
if (_lazy_read_ctx.resize_first_column) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
}
result_filter.assign(pre_read_rows, static_cast<unsigned char>(1));
bool can_filter_all = false;
std::vector<IColumn::Filter*> filters;
if (_position_delete_ctx.has_filter) {
filters.push_back(_pos_delete_filter_ptr.get());
}
VExprContextSPtrs filter_contexts;
for (auto& conjunct : _filter_conjuncts) {
filter_contexts.emplace_back(conjunct);
}
RETURN_IF_ERROR(VExprContext::execute_conjuncts(filter_contexts, &filters, block,
&result_filter, &can_filter_all));
if (_lazy_read_ctx.resize_first_column) {
// We have to clean the first column to insert right data.
block->get_by_position(0).column->assume_mutable()->clear();
}
const uint8_t* __restrict filter_map = result_filter.data();
select_vector_ptr.reset(new ColumnSelectVector(filter_map, pre_read_rows, can_filter_all));
if (select_vector_ptr->filter_all()) {
for (auto& col : _lazy_read_ctx.predicate_columns.first) {
// clean block to read predicate columns
block->get_by_name(col).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
block->get_by_name(col.first).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
block->get_by_name(col.first).column->assume_mutable()->clear();
}
Block::erase_useless_column(block, origin_column_num);
if (!pre_eof) {
// If continuous batches are skipped, we can cache them to skip a whole page
_cached_filtered_rows += pre_read_rows;
} else { // pre_eof
// If select_vector_ptr->filter_all() and pre_eof, we can skip whole row group.
*read_rows = 0;
*batch_eof = true;
_lazy_read_filtered_rows += pre_read_rows;
_convert_dict_cols_to_string_cols(block);
return Status::OK();
}
} else {
break;
}
}
if (select_vector_ptr == nullptr) {
DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
*read_rows = 0;
*batch_eof = true;
return Status::OK();
}
ColumnSelectVector& select_vector = *select_vector_ptr;
std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
if (_cached_filtered_rows != 0) {
_rebuild_select_vector(select_vector, rebuild_filter_map, pre_read_rows);
pre_read_rows += _cached_filtered_rows;
_cached_filtered_rows = 0;
}
// lazy read columns
size_t lazy_read_rows;
bool lazy_eof;
RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.lazy_read_columns, pre_read_rows,
&lazy_read_rows, &lazy_eof, select_vector));
if (pre_read_rows != lazy_read_rows) {
return Status::Corruption("Can't read the same number of rows when doing lazy read");
}
// pre_eof ^ lazy_eof
// we set pre_read_rows as batch_size for lazy read columns, so pre_eof != lazy_eof
// filter data in predicate columns, and remove filter column
if (select_vector.has_filter()) {
if (block->columns() == origin_column_num) {
// the whole row group has been filtered by _lazy_read_ctx.vconjunct_ctx, and batch_eof is
// generated from next batch, so the filter column is removed ahead.
DCHECK_EQ(block->rows(), 0);
} else {
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
block, _lazy_read_ctx.all_predicate_col_ids, result_filter));
Block::erase_useless_column(block, origin_column_num);
}
} else {
Block::erase_useless_column(block, origin_column_num);
}
_convert_dict_cols_to_string_cols(block);
size_t column_num = block->columns();
size_t column_size = 0;
for (int i = 0; i < column_num; ++i) {
size_t cz = block->get_by_position(i).column->size();
if (column_size != 0 && cz != 0) {
DCHECK_EQ(column_size, cz);
}
if (cz != 0) {
column_size = cz;
}
}
_lazy_read_filtered_rows += pre_read_rows - column_size;
*read_rows = column_size;
*batch_eof = pre_eof;
RETURN_IF_ERROR(_fill_partition_columns(block, column_size, _lazy_read_ctx.partition_columns));
return _fill_missing_columns(block, column_size, _lazy_read_ctx.missing_columns);
}
void RowGroupReader::_rebuild_select_vector(ColumnSelectVector& select_vector,
std::unique_ptr<uint8_t[]>& filter_map,
size_t pre_read_rows) const {
if (_cached_filtered_rows == 0) {
return;
}
size_t total_rows = _cached_filtered_rows + pre_read_rows;
if (select_vector.filter_all()) {
select_vector.build(nullptr, total_rows, true);
return;
}
uint8_t* map = new uint8_t[total_rows];
filter_map.reset(map);
for (size_t i = 0; i < _cached_filtered_rows; ++i) {
map[i] = 0;
}
const uint8_t* old_map = select_vector.filter_map();
if (old_map == nullptr) {
// select_vector.filter_all() == true is already built.
for (size_t i = _cached_filtered_rows; i < total_rows; ++i) {
map[i] = 1;
}
} else {
memcpy(map + _cached_filtered_rows, old_map, pre_read_rows);
}
select_vector.build(map, total_rows, false);
}
Status RowGroupReader::_fill_partition_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
partition_columns) {
for (auto& kv : partition_columns) {
auto doris_column = block->get_by_name(kv.first).column;
IColumn* col_ptr = const_cast<IColumn*>(doris_column.get());
auto& [value, slot_desc] = kv.second;
if (!_text_converter->write_vec_column(slot_desc, col_ptr, const_cast<char*>(value.c_str()),
value.size(), true, false, rows)) {
return Status::InternalError("Failed to fill partition column: {}={}",
slot_desc->col_name(), value);
}
}
return Status::OK();
}
Status RowGroupReader::_fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
for (auto& kv : missing_columns) {
if (kv.second == nullptr) {
// no default column, fill with null
auto nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(
(*std::move(block->get_by_name(kv.first).column)).mutate().get());
nullable_column->insert_many_defaults(rows);
} else {
// fill with default value
auto& ctx = kv.second;
auto origin_column_num = block->columns();
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(block, &result_column_id));
bool is_origin_column = result_column_id < origin_column_num;
if (!is_origin_column) {
// call resize because the first column of _src_block_ptr may not be filled by reader,
// so _src_block_ptr->rows() may return wrong result, cause the column created by `ctx->execute()`
// has only one row.
std::move(*block->get_by_position(result_column_id).column).mutate()->resize(rows);
auto result_column_ptr = block->get_by_position(result_column_id).column;
// result_column_ptr maybe a ColumnConst, convert it to a normal column
result_column_ptr = result_column_ptr->convert_to_full_column_if_const();
auto origin_column_type = block->get_by_name(kv.first).type;
bool is_nullable = origin_column_type->is_nullable();
block->replace_by_position(
block->get_position_by_name(kv.first),
is_nullable ? make_nullable(result_column_ptr) : result_column_ptr);
block->erase(result_column_id);
}
}
}
return Status::OK();
}
Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof) {
if (_position_delete_ctx.has_filter) {
int64_t start_row_id = _position_delete_ctx.current_row_id;
int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size,
_position_delete_ctx.last_row_id);
int64_t num_delete_rows = 0;
while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
const int64_t& delete_row_id =
_position_delete_ctx.delete_rows[_position_delete_ctx.index];
if (delete_row_id < start_row_id) {
_position_delete_ctx.index++;
} else if (delete_row_id < end_row_id) {
num_delete_rows++;
_position_delete_ctx.index++;
} else { // delete_row_id >= end_row_id
break;
}
}
*read_rows = end_row_id - start_row_id - num_delete_rows;
_position_delete_ctx.current_row_id = end_row_id;
*batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id;
} else {
if (batch_size < _remaining_rows) {
*read_rows = batch_size;
_remaining_rows -= batch_size;
*batch_eof = false;
} else {
*read_rows = _remaining_rows;
_remaining_rows = 0;
*batch_eof = true;
}
}
return Status::OK();
}
Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
if (!_position_delete_ctx.has_filter) {
_pos_delete_filter_ptr.reset(nullptr);
_total_read_rows += read_rows;
return Status::OK();
}
_pos_delete_filter_ptr.reset(new IColumn::Filter(read_rows, 1));
auto* __restrict _pos_delete_filter_data = _pos_delete_filter_ptr->data();
while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
const int64_t delete_row_index_in_row_group =
_position_delete_ctx.delete_rows[_position_delete_ctx.index] -
_position_delete_ctx.first_row_id;
int64_t read_range_rows = 0;
size_t remaining_read_rows = _total_read_rows + read_rows;
for (auto& range : _read_ranges) {
if (delete_row_index_in_row_group < range.first_row) {
++_position_delete_ctx.index;
break;
} else if (delete_row_index_in_row_group < range.last_row) {
int64_t index = (delete_row_index_in_row_group - range.first_row) +
read_range_rows - _total_read_rows;
if (index > read_rows - 1) {
_total_read_rows += read_rows;
return Status::OK();
}
_pos_delete_filter_data[index] = 0;
++_position_delete_ctx.index;
break;
} else { // delete_row >= range.last_row
}
int64_t range_size = range.last_row - range.first_row;
// Don't search next range when there is no remaining_read_rows.
if (remaining_read_rows <= range_size) {
_total_read_rows += read_rows;
return Status::OK();
} else {
remaining_read_rows -= range_size;
read_range_rows += range_size;
}
}
}
_total_read_rows += read_rows;
return Status::OK();
}
// need exception safety
Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
const std::vector<uint32_t>& columns_to_filter) {
if (_pos_delete_filter_ptr) {
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, (*_pos_delete_filter_ptr)));
}
Block::erase_useless_column(block, column_to_keep);
return Status::OK();
}
Status RowGroupReader::_rewrite_dict_predicates() {
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
std::string& dict_filter_col_name = it->first;
int slot_id = it->second;
// 1. Get dictionary values to a string column.
MutableColumnPtr dict_value_column = ColumnString::create();
bool has_dict = false;
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->read_dict_values_to_column(
dict_value_column, &has_dict));
size_t dict_value_column_size = dict_value_column->size();
DCHECK(has_dict);
// 2. Build a temp block from the dict string column, then execute conjuncts and filter block.
// 2.1 Build a temp block from the dict string column to match the conjuncts executing.
Block temp_block;
int dict_pos = -1;
int index = 0;
for (const auto slot_desc : _tuple_descriptor->slots()) {
if (!slot_desc->need_materialize()) {
// should be ignored from reading
continue;
}
if (slot_desc->id() == slot_id) {
auto data_type = slot_desc->get_data_type_ptr();
if (data_type->is_nullable()) {
temp_block.insert(
{ColumnNullable::create(std::move(dict_value_column),
ColumnUInt8::create(dict_value_column_size, 0)),
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>()),
""});
} else {
temp_block.insert(
{std::move(dict_value_column), std::make_shared<DataTypeString>(), ""});
}
dict_pos = index;
} else {
temp_block.insert(ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}
++index;
}
// 2.2 Execute conjuncts and filter block.
VExprContextSPtrs ctxs;
auto iter = _slot_id_to_filter_conjuncts->find(slot_id);
if (iter != _slot_id_to_filter_conjuncts->end()) {
for (auto& ctx : iter->second) {
ctxs.push_back(ctx);
}
} else {
std::stringstream msg;
msg << "_slot_id_to_filter_conjuncts: slot_id [" << slot_id << "] not found";
return Status::NotFound(msg.str());
}
std::vector<uint32_t> columns_to_filter(1, dict_pos);
int column_to_keep = temp_block.columns();
if (dict_pos != 0) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
// The following process may be tricky and time-consuming, but we have no other way.
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
}
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep));
if (dict_pos != 0) {
// We have to clean the first column to insert right data.
temp_block.get_by_position(0).column->assume_mutable()->clear();
}
// Check some conditions.
ColumnPtr& dict_column = temp_block.get_by_position(dict_pos).column;
// If dict_column->size() == 0, can filter this row group.
if (dict_column->size() == 0) {
_is_row_group_filtered = true;
return Status::OK();
}
// About Performance: if dict_column size is too large, it will generate a large IN filter.
if (dict_column->size() > MAX_DICT_CODE_PREDICATE_TO_REWRITE) {
it = _dict_filter_cols.erase(it);
for (auto& ctx : ctxs) {
_filter_conjuncts.push_back(ctx);
}
continue;
}
// 3. Get dict codes.
std::vector<int32_t> dict_codes;
if (dict_column->is_nullable()) {
const ColumnNullable* nullable_column =
static_cast<const ColumnNullable*>(dict_column.get());
const ColumnString* nested_column = static_cast<const ColumnString*>(
nullable_column->get_nested_column_ptr().get());
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
assert_cast<const ColumnString*>(nested_column), &dict_codes));
} else {
RETURN_IF_ERROR(_column_readers[dict_filter_col_name]->get_dict_codes(
assert_cast<const ColumnString*>(dict_column.get()), &dict_codes));
}
// 4. Rewrite conjuncts.
_rewrite_dict_conjuncts(dict_codes, slot_id, dict_column->is_nullable());
++it;
}
return Status::OK();
}
Status RowGroupReader::_rewrite_dict_conjuncts(std::vector<int32_t>& dict_codes, int slot_id,
bool is_nullable) {
VExprSPtr root;
if (dict_codes.size() == 1) {
{
TFunction fn;
TFunctionName fn_name;
fn_name.__set_db_name("");
fn_name.__set_function_name("eq");
fn.__set_name(fn_name);
fn.__set_binary_type(TFunctionBinaryType::BUILTIN);
std::vector<TTypeDesc> arg_types;
arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
arg_types.push_back(create_type_desc(PrimitiveType::TYPE_INT));
fn.__set_arg_types(arg_types);
fn.__set_ret_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
fn.__set_has_var_args(false);
TExprNode texpr_node;
texpr_node.__set_type(create_type_desc(PrimitiveType::TYPE_BOOLEAN));
texpr_node.__set_node_type(TExprNodeType::BINARY_PRED);
texpr_node.__set_opcode(TExprOpcode::EQ);
texpr_node.__set_vector_opcode(TExprOpcode::EQ);
texpr_node.__set_fn(fn);
texpr_node.__set_child_type(TPrimitiveType::INT);
texpr_node.__set_num_children(2);
texpr_node.__set_is_nullable(is_nullable);
root = VectorizedFnCall::create_shared(texpr_node);
}
{
SlotDescriptor* slot = nullptr;
const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
for (auto each : slots) {
if (each->id() == slot_id) {
slot = each;
break;
}
}
root->add_child(VSlotRef::create_shared(slot));
}
{
TExprNode texpr_node;
texpr_node.__set_node_type(TExprNodeType::INT_LITERAL);
texpr_node.__set_type(create_type_desc(TYPE_INT));
TIntLiteral int_literal;
int_literal.__set_value(dict_codes[0]);
texpr_node.__set_int_literal(int_literal);
texpr_node.__set_is_nullable(is_nullable);
root->add_child(VLiteral::create_shared(texpr_node));
}
} else {
{
TTypeDesc type_desc = create_type_desc(PrimitiveType::TYPE_BOOLEAN);
TExprNode node;
node.__set_type(type_desc);
node.__set_node_type(TExprNodeType::IN_PRED);
node.in_predicate.__set_is_not_in(false);
node.__set_opcode(TExprOpcode::FILTER_IN);
node.__isset.vector_opcode = true;
node.__set_vector_opcode(TExprOpcode::FILTER_IN);
// VdirectInPredicate assume is_nullable = false.
node.__set_is_nullable(false);
root = vectorized::VDirectInPredicate::create_shared(node);
std::shared_ptr<HybridSetBase> hybrid_set(
create_set(PrimitiveType::TYPE_INT, dict_codes.size()));
for (int j = 0; j < dict_codes.size(); ++j) {
hybrid_set->insert(&dict_codes[j]);
}
static_cast<vectorized::VDirectInPredicate*>(root.get())->set_filter(hybrid_set);
}
{
SlotDescriptor* slot = nullptr;
const std::vector<SlotDescriptor*>& slots = _tuple_descriptor->slots();
for (auto each : slots) {
if (each->id() == slot_id) {
slot = each;
break;
}
}
root->add_child(VSlotRef::create_shared(slot));
}
}
VExprContextSPtr rewritten_conjunct_ctx = VExprContext::create_shared(root);
RETURN_IF_ERROR(rewritten_conjunct_ctx->prepare(_state, *_row_descriptor));
RETURN_IF_ERROR(rewritten_conjunct_ctx->open(_state));
_dict_filter_conjuncts.push_back(rewritten_conjunct_ctx);
_filter_conjuncts.push_back(rewritten_conjunct_ctx);
return Status::OK();
}
void RowGroupReader::_convert_dict_cols_to_string_cols(Block* block) {
for (auto& dict_filter_cols : _dict_filter_cols) {
size_t pos = block->get_position_by_name(dict_filter_cols.first);
ColumnWithTypeAndName& column_with_type_and_name = block->get_by_position(pos);
const ColumnPtr& column = column_with_type_and_name.column;
if (auto* nullable_column = check_and_get_column<ColumnNullable>(*column)) {
const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr();
const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(nested_column.get());
DCHECK(dict_column);
MutableColumnPtr string_column =
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
dict_column);
column_with_type_and_name.type =
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeString>());
block->replace_by_position(
pos, ColumnNullable::create(std::move(string_column),
nullable_column->get_null_map_column_ptr()));
} else {
const ColumnInt32* dict_column = assert_cast<const ColumnInt32*>(column.get());
MutableColumnPtr string_column =
_column_readers[dict_filter_cols.first]->convert_dict_column_to_string_column(
dict_column);
column_with_type_and_name.type = std::make_shared<DataTypeString>();
block->replace_by_position(pos, std::move(string_column));
}
}
}
ParquetColumnReader::Statistics RowGroupReader::statistics() {
ParquetColumnReader::Statistics st;
for (auto& reader : _column_readers) {
auto ost = reader.second->statistics();
st.merge(ost);
}
return st;
}
} // namespace doris::vectorized