blob: fab61cf1530909e99e14d652f5ec6136c0a7547d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/rowset/segment_v2/column_writer.h"
#include <gen_cpp/segment_v2.pb.h>
#include <algorithm>
#include <cstring>
#include <filesystem>
#include <memory>
#include "common/config.h"
#include "common/logging.h"
#include "io/fs/file_writer.h"
#include "olap/olap_common.h"
#include "olap/rowset/segment_v2/bloom_filter_index_writer.h"
#include "olap/rowset/segment_v2/encoding_info.h"
#include "olap/rowset/segment_v2/inverted_index_writer.h"
#include "olap/rowset/segment_v2/options.h"
#include "olap/rowset/segment_v2/ordinal_page_index.h"
#include "olap/rowset/segment_v2/page_builder.h"
#include "olap/rowset/segment_v2/page_io.h"
#include "olap/rowset/segment_v2/page_pointer.h"
#include "olap/rowset/segment_v2/variant/variant_column_writer_impl.h"
#include "olap/rowset/segment_v2/zone_map_index.h"
#include "olap/tablet_schema.h"
#include "olap/types.h"
#include "runtime/collection_value.h"
#include "util/block_compression.h"
#include "util/debug_points.h"
#include "util/faststring.h"
#include "util/rle_encoding.h"
#include "util/simd/bits.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_agg_state.h"
#include "vec/data_types/data_type_factory.hpp"
namespace doris::segment_v2 {
#include "common/compile_check_begin.h"
class NullBitmapBuilder {
public:
NullBitmapBuilder() : _has_null(false), _bitmap_buf(512), _rle_encoder(&_bitmap_buf, 1) {}
explicit NullBitmapBuilder(size_t reserve_bits)
: _has_null(false),
_bitmap_buf(BitmapSize(reserve_bits)),
_rle_encoder(&_bitmap_buf, 1) {}
void reserve_for_write(size_t num_rows, size_t non_null_count) {
if (num_rows == 0) {
return;
}
if (non_null_count == 0 || (non_null_count == num_rows && !_has_null)) {
if (_bitmap_buf.capacity() < kSmallReserveBytes) {
_bitmap_buf.reserve(kSmallReserveBytes);
}
return;
}
size_t raw_bytes = BitmapSize(num_rows);
size_t run_est = std::min(num_rows, non_null_count * 2 + 1);
size_t run_bytes_est = run_est * kBytesPerRun + kReserveSlackBytes;
size_t raw_overhead = raw_bytes / 63 + 1;
size_t raw_est = raw_bytes + raw_overhead + kReserveSlackBytes;
size_t reserve_bytes = std::min(raw_est, run_bytes_est);
if (_bitmap_buf.capacity() < reserve_bytes) {
_bitmap_buf.reserve(reserve_bytes);
}
}
void add_run(bool value, size_t run) {
_has_null |= value;
_rle_encoder.Put(value, run);
}
// Returns whether the building nullmap contains nullptr
bool has_null() const { return _has_null; }
Status finish(OwnedSlice* slice) {
_rle_encoder.Flush();
RETURN_IF_CATCH_EXCEPTION({ *slice = _bitmap_buf.build(); });
return Status::OK();
}
void reset() {
_has_null = false;
_rle_encoder.Clear();
}
uint64_t size() { return _bitmap_buf.size(); }
private:
static constexpr size_t kSmallReserveBytes = 64;
static constexpr size_t kReserveSlackBytes = 16;
static constexpr size_t kBytesPerRun = 6;
bool _has_null;
faststring _bitmap_buf;
RleEncoder<bool> _rle_encoder;
};
inline ScalarColumnWriter* get_null_writer(const ColumnWriterOptions& opts,
io::FileWriter* file_writer, uint32_t id) {
if (!opts.meta->is_nullable()) {
return nullptr;
}
FieldType null_type = FieldType::OLAP_FIELD_TYPE_TINYINT;
ColumnWriterOptions null_options;
null_options.meta = opts.meta->add_children_columns();
null_options.meta->set_column_id(id);
null_options.meta->set_unique_id(id);
null_options.meta->set_type(int(null_type));
null_options.meta->set_is_nullable(false);
null_options.meta->set_length(
cast_set<int32_t>(get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_TINYINT>()->size()));
null_options.meta->set_encoding(DEFAULT_ENCODING);
null_options.meta->set_compression(opts.meta->compression());
null_options.need_zone_map = false;
null_options.need_bloom_filter = false;
null_options.encoding_preference = opts.encoding_preference;
TabletColumn null_column =
TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, null_type, false,
null_options.meta->unique_id(), null_options.meta->length());
null_column.set_name("nullable");
null_column.set_index_length(-1); // no short key index
std::unique_ptr<Field> null_field(FieldFactory::create(null_column));
return new ScalarColumnWriter(null_options, std::move(null_field), file_writer);
}
Status ColumnWriter::create_struct_writer(const ColumnWriterOptions& opts,
const TabletColumn* column, io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
// not support empty struct
DCHECK(column->get_subtype_count() >= 1);
std::vector<std::unique_ptr<ColumnWriter>> sub_column_writers;
sub_column_writers.reserve(column->get_subtype_count());
for (uint32_t i = 0; i < column->get_subtype_count(); i++) {
const TabletColumn& sub_column = column->get_sub_column(i);
RETURN_IF_ERROR(sub_column.check_valid());
// create sub writer
ColumnWriterOptions column_options;
column_options.meta = opts.meta->mutable_children_columns(i);
column_options.need_zone_map = false;
column_options.need_bloom_filter = sub_column.is_bf_column();
column_options.encoding_preference = opts.encoding_preference;
std::unique_ptr<ColumnWriter> sub_column_writer;
RETURN_IF_ERROR(
ColumnWriter::create(column_options, &sub_column, file_writer, &sub_column_writer));
sub_column_writers.push_back(std::move(sub_column_writer));
}
ScalarColumnWriter* null_writer =
get_null_writer(opts, file_writer, column->get_subtype_count() + 1);
*writer = std::unique_ptr<ColumnWriter>(
new StructColumnWriter(opts, std::unique_ptr<Field>(FieldFactory::create(*column)),
null_writer, sub_column_writers));
return Status::OK();
}
Status ColumnWriter::create_array_writer(const ColumnWriterOptions& opts,
const TabletColumn* column, io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
DCHECK(column->get_subtype_count() == 1);
const TabletColumn& item_column = column->get_sub_column(0);
RETURN_IF_ERROR(item_column.check_valid());
// create item writer
ColumnWriterOptions item_options;
item_options.meta = opts.meta->mutable_children_columns(0);
item_options.need_zone_map = false;
item_options.need_bloom_filter = item_column.is_bf_column();
item_options.encoding_preference = opts.encoding_preference;
std::unique_ptr<ColumnWriter> item_writer;
RETURN_IF_ERROR(ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
// create length writer
FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
ColumnWriterOptions length_options;
length_options.meta = opts.meta->add_children_columns();
length_options.meta->set_column_id(2);
length_options.meta->set_unique_id(2);
length_options.meta->set_type(int(length_type));
length_options.meta->set_is_nullable(false);
length_options.meta->set_length(cast_set<int32_t>(
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size()));
length_options.meta->set_encoding(DEFAULT_ENCODING);
length_options.meta->set_compression(opts.meta->compression());
length_options.need_zone_map = false;
length_options.need_bloom_filter = false;
length_options.encoding_preference = opts.encoding_preference;
TabletColumn length_column =
TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
length_options.meta->is_nullable(), length_options.meta->unique_id(),
length_options.meta->length());
length_column.set_name("length");
length_column.set_index_length(-1); // no short key index
std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
auto* length_writer =
new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);
ScalarColumnWriter* null_writer = get_null_writer(opts, file_writer, 3);
*writer = std::unique_ptr<ColumnWriter>(
new ArrayColumnWriter(opts, std::unique_ptr<Field>(FieldFactory::create(*column)),
length_writer, null_writer, std::move(item_writer)));
return Status::OK();
}
Status ColumnWriter::create_map_writer(const ColumnWriterOptions& opts, const TabletColumn* column,
io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
DCHECK(column->get_subtype_count() == 2);
if (column->get_subtype_count() < 2) {
return Status::InternalError(
"If you upgraded from version 1.2.*, please DROP the MAP columns and then "
"ADD the MAP columns back.");
}
// create key & value writer
std::vector<std::unique_ptr<ColumnWriter>> inner_writer_list;
for (int i = 0; i < 2; ++i) {
const TabletColumn& item_column = column->get_sub_column(i);
RETURN_IF_ERROR(item_column.check_valid());
// create item writer
ColumnWriterOptions item_options;
item_options.meta = opts.meta->mutable_children_columns(i);
item_options.need_zone_map = false;
item_options.need_bloom_filter = item_column.is_bf_column();
item_options.encoding_preference = opts.encoding_preference;
std::unique_ptr<ColumnWriter> item_writer;
RETURN_IF_ERROR(
ColumnWriter::create(item_options, &item_column, file_writer, &item_writer));
inner_writer_list.push_back(std::move(item_writer));
}
// create offset writer
FieldType length_type = FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT;
// Be Cautious: column unique id is used for column reader creation
ColumnWriterOptions length_options;
length_options.meta = opts.meta->add_children_columns();
length_options.meta->set_column_id(column->get_subtype_count() + 1);
length_options.meta->set_unique_id(column->get_subtype_count() + 1);
length_options.meta->set_type(int(length_type));
length_options.meta->set_is_nullable(false);
length_options.meta->set_length(cast_set<int32_t>(
get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT>()->size()));
length_options.meta->set_encoding(DEFAULT_ENCODING);
length_options.meta->set_compression(opts.meta->compression());
length_options.need_zone_map = false;
length_options.need_bloom_filter = false;
length_options.encoding_preference = opts.encoding_preference;
TabletColumn length_column =
TabletColumn(FieldAggregationMethod::OLAP_FIELD_AGGREGATION_NONE, length_type,
length_options.meta->is_nullable(), length_options.meta->unique_id(),
length_options.meta->length());
length_column.set_name("length");
length_column.set_index_length(-1); // no short key index
std::unique_ptr<Field> bigint_field(FieldFactory::create(length_column));
auto* length_writer =
new OffsetColumnWriter(length_options, std::move(bigint_field), file_writer);
ScalarColumnWriter* null_writer =
get_null_writer(opts, file_writer, column->get_subtype_count() + 2);
*writer = std::unique_ptr<ColumnWriter>(
new MapColumnWriter(opts, std::unique_ptr<Field>(FieldFactory::create(*column)),
null_writer, length_writer, inner_writer_list));
return Status::OK();
}
Status ColumnWriter::create_agg_state_writer(const ColumnWriterOptions& opts,
const TabletColumn* column,
io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(*column);
const auto* agg_state_type = assert_cast<const vectorized::DataTypeAggState*>(data_type.get());
auto type = agg_state_type->get_serialized_type()->get_primitive_type();
if (type == PrimitiveType::TYPE_STRING || type == PrimitiveType::INVALID_TYPE ||
type == PrimitiveType::TYPE_FIXED_LENGTH_OBJECT || type == PrimitiveType::TYPE_BITMAP) {
*writer = std::unique_ptr<ColumnWriter>(new ScalarColumnWriter(
opts, std::unique_ptr<Field>(FieldFactory::create(*column)), file_writer));
} else if (type == PrimitiveType::TYPE_ARRAY) {
RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
} else if (type == PrimitiveType::TYPE_MAP) {
RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
} else {
throw Exception(ErrorCode::INTERNAL_ERROR,
"OLAP_FIELD_TYPE_AGG_STATE meet unsupported type: {}",
agg_state_type->get_name());
}
return Status::OK();
}
Status ColumnWriter::create_variant_writer(const ColumnWriterOptions& opts,
const TabletColumn* column, io::FileWriter* file_writer,
std::unique_ptr<ColumnWriter>* writer) {
// Variant extracted columns have two kinds of physical writers:
// - Doc-value snapshot column (`...__DORIS_VARIANT_DOC_VALUE__...`): use `VariantDocCompactWriter`
// to store the doc snapshot in a compact binary form.
// - Regular extracted subcolumns: use `VariantSubcolumnWriter`.
// The root VARIANT column itself uses `VariantColumnWriter`.
if (column->is_extracted_column()) {
if (column->name().find(DOC_VALUE_COLUMN_PATH) != std::string::npos) {
*writer = std::make_unique<VariantDocCompactWriter>(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)));
return Status::OK();
}
VLOG_DEBUG << "gen subwriter for " << column->path_info_ptr()->get_path();
*writer = std::make_unique<VariantSubcolumnWriter>(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)));
return Status::OK();
}
*writer = std::make_unique<VariantColumnWriter>(
opts, column, std::unique_ptr<Field>(FieldFactory::create(*column)));
return Status::OK();
}
//Todo(Amory): here should according nullable and offset and need sub to simply this function
Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn* column,
io::FileWriter* file_writer, std::unique_ptr<ColumnWriter>* writer) {
std::unique_ptr<Field> field(FieldFactory::create(*column));
DCHECK(field.get() != nullptr);
if (is_scalar_type(column->type())) {
*writer = std::unique_ptr<ColumnWriter>(
new ScalarColumnWriter(opts, std::move(field), file_writer));
return Status::OK();
} else {
switch (column->type()) {
case FieldType::OLAP_FIELD_TYPE_AGG_STATE: {
RETURN_IF_ERROR(create_agg_state_writer(opts, column, file_writer, writer));
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_STRUCT: {
RETURN_IF_ERROR(create_struct_writer(opts, column, file_writer, writer));
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_ARRAY: {
RETURN_IF_ERROR(create_array_writer(opts, column, file_writer, writer));
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_MAP: {
RETURN_IF_ERROR(create_map_writer(opts, column, file_writer, writer));
return Status::OK();
}
case FieldType::OLAP_FIELD_TYPE_VARIANT: {
// Process columns with sparse column
RETURN_IF_ERROR(create_variant_writer(opts, column, file_writer, writer));
return Status::OK();
}
default:
return Status::NotSupported("unsupported type for ColumnWriter: {}",
std::to_string(int(field->type())));
}
}
}
Status ColumnWriter::append_nullable(const uint8_t* is_null_bits, const void* data,
size_t num_rows) {
const auto* ptr = (const uint8_t*)data;
BitmapIterator null_iter(is_null_bits, num_rows);
bool is_null = false;
size_t this_run = 0;
while ((this_run = null_iter.Next(&is_null)) > 0) {
if (is_null) {
RETURN_IF_ERROR(append_nulls(this_run));
} else {
RETURN_IF_ERROR(append_data(&ptr, this_run));
}
}
return Status::OK();
}
Status ColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
// Fast path: use SIMD to detect all-NULL or all-non-NULL columns
if (config::enable_rle_batch_put_optimization) {
size_t non_null_count =
simd::count_zero_num(reinterpret_cast<const int8_t*>(null_map), num_rows);
if (non_null_count == 0) {
// All NULL: skip run-length iteration, directly append all nulls
RETURN_IF_ERROR(append_nulls(num_rows));
*ptr += get_field()->size() * num_rows;
return Status::OK();
}
if (non_null_count == num_rows) {
// All non-NULL: skip run-length iteration, directly append all data
return append_data(ptr, num_rows);
}
}
// Mixed case or sparse optimization disabled: use run-length processing
size_t offset = 0;
auto next_run_step = [&]() {
size_t step = 1;
for (auto i = offset + 1; i < num_rows; ++i) {
if (null_map[offset] == null_map[i]) {
step++;
} else {
break;
}
}
return step;
};
do {
auto step = next_run_step();
if (null_map[offset]) {
RETURN_IF_ERROR(append_nulls(step));
*ptr += get_field()->size() * step;
} else {
// TODO:
// 1. `*ptr += get_field()->size() * step;` should do in this function, not append_data;
// 2. support array vectorized load and ptr offset add
RETURN_IF_ERROR(append_data(ptr, step));
}
offset += step;
} while (offset < num_rows);
return Status::OK();
}
Status ColumnWriter::append(const uint8_t* nullmap, const void* data, size_t num_rows) {
assert(data && num_rows > 0);
const auto* ptr = (const uint8_t*)data;
if (nullmap) {
return append_nullable(nullmap, &ptr, num_rows);
} else {
return append_data(&ptr, num_rows);
}
}
///////////////////////////////////////////////////////////////////////////////////
ScalarColumnWriter::ScalarColumnWriter(const ColumnWriterOptions& opts,
std::unique_ptr<Field> field, io::FileWriter* file_writer)
: ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta),
_opts(opts),
_file_writer(file_writer),
_data_size(0) {
// these opts.meta fields should be set by client
DCHECK(opts.meta->has_column_id());
DCHECK(opts.meta->has_unique_id());
DCHECK(opts.meta->has_type());
DCHECK(opts.meta->has_length());
DCHECK(opts.meta->has_encoding());
DCHECK(opts.meta->has_compression());
DCHECK(opts.meta->has_is_nullable());
DCHECK(file_writer != nullptr);
_inverted_index_builders.resize(_opts.inverted_indexes.size());
}
ScalarColumnWriter::~ScalarColumnWriter() {
// delete all pages
_pages.clear();
}
Status ScalarColumnWriter::init() {
RETURN_IF_ERROR(get_block_compression_codec(_opts.meta->compression(), &_compress_codec));
PageBuilder* page_builder = nullptr;
RETURN_IF_ERROR(EncodingInfo::get(get_field()->type(), _opts.meta->encoding(),
_opts.encoding_preference, &_encoding_info));
_opts.meta->set_encoding(_encoding_info->encoding());
// create page builder
PageBuilderOptions opts;
opts.data_page_size = _opts.data_page_size;
opts.dict_page_size = _opts.dict_page_size;
opts.encoding_preference = _opts.encoding_preference;
RETURN_IF_ERROR(_encoding_info->create_page_builder(opts, &page_builder));
if (page_builder == nullptr) {
return Status::NotSupported("Failed to create page builder for type {} and encoding {}",
get_field()->type(), _opts.meta->encoding());
}
// should store more concrete encoding type instead of DEFAULT_ENCODING
// because the default encoding of a data type can be changed in the future
DCHECK_NE(_opts.meta->encoding(), DEFAULT_ENCODING);
VLOG_DEBUG << fmt::format(
"[verbose] scalar column writer init, column_id={}, type={}, encoding={}, "
"is_nullable={}",
_opts.meta->column_id(), get_field()->type(),
EncodingTypePB_Name(_opts.meta->encoding()), _opts.meta->is_nullable());
_page_builder.reset(page_builder);
// create ordinal builder
_ordinal_index_builder = std::make_unique<OrdinalIndexWriter>();
// create null bitmap builder
if (is_nullable()) {
_null_bitmap_builder = std::make_unique<NullBitmapBuilder>();
}
if (_opts.need_zone_map) {
RETURN_IF_ERROR(ZoneMapIndexWriter::create(get_field(), _zone_map_index_builder));
}
if (_opts.need_inverted_index) {
do {
for (size_t i = 0; i < _opts.inverted_indexes.size(); i++) {
DBUG_EXECUTE_IF("column_writer.init", {
class InvertedIndexColumnWriterEmpty final : public IndexColumnWriter {
public:
Status init() override { return Status::OK(); }
Status add_values(const std::string name, const void* values,
size_t count) override {
return Status::OK();
}
Status add_array_values(size_t field_size, const CollectionValue* values,
size_t count) override {
return Status::OK();
}
Status add_array_values(size_t field_size, const void* value_ptr,
const uint8_t* null_map, const uint8_t* offsets_ptr,
size_t count) override {
return Status::OK();
}
Status add_nulls(uint32_t count) override { return Status::OK(); }
Status add_array_nulls(const uint8_t* null_map, size_t num_rows) override {
return Status::OK();
}
Status finish() override { return Status::OK(); }
int64_t size() const override { return 0; }
void close_on_error() override {}
};
_inverted_index_builders[i] =
std::make_unique<InvertedIndexColumnWriterEmpty>();
break;
});
RETURN_IF_ERROR(IndexColumnWriter::create(get_field(), &_inverted_index_builders[i],
_opts.index_file_writer,
_opts.inverted_indexes[i]));
}
} while (false);
}
if (_opts.need_bloom_filter) {
if (_opts.is_ngram_bf_index) {
RETURN_IF_ERROR(NGramBloomFilterIndexWriterImpl::create(
BloomFilterOptions(), get_field()->type_info(), _opts.gram_size,
_opts.gram_bf_size, &_bloom_filter_index_builder));
} else {
RETURN_IF_ERROR(BloomFilterIndexWriter::create(
_opts.bf_options, get_field()->type_info(), &_bloom_filter_index_builder));
}
}
return Status::OK();
}
Status ScalarColumnWriter::append_nulls(size_t num_rows) {
_null_bitmap_builder->add_run(true, num_rows);
_next_rowid += num_rows;
if (_opts.need_zone_map) {
_zone_map_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
}
if (_opts.need_inverted_index) {
for (const auto& builder : _inverted_index_builders) {
RETURN_IF_ERROR(builder->add_nulls(cast_set<uint32_t>(num_rows)));
}
}
if (_opts.need_bloom_filter) {
_bloom_filter_index_builder->add_nulls(cast_set<uint32_t>(num_rows));
}
return Status::OK();
}
// append data to page builder. this function will make sure that
// num_rows must be written before return. And ptr will be modified
// to next data should be written
Status ScalarColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
size_t remaining = num_rows;
while (remaining > 0) {
size_t num_written = remaining;
RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
remaining -= num_written;
if (_page_builder->is_page_full()) {
RETURN_IF_ERROR(finish_current_page());
}
}
return Status::OK();
}
Status ScalarColumnWriter::_internal_append_data_in_current_page(const uint8_t* data,
size_t* num_written) {
RETURN_IF_ERROR(_page_builder->add(data, num_written));
if (_opts.need_zone_map) {
_zone_map_index_builder->add_values(data, *num_written);
}
if (_opts.need_inverted_index) {
for (const auto& builder : _inverted_index_builders) {
RETURN_IF_ERROR(builder->add_values(get_field()->name(), data, *num_written));
}
}
if (_opts.need_bloom_filter) {
RETURN_IF_ERROR(_bloom_filter_index_builder->add_values(data, *num_written));
}
_next_rowid += *num_written;
// we must write null bits after write data, because we don't
// know how many rows can be written into current page
if (is_nullable()) {
_null_bitmap_builder->add_run(false, *num_written);
}
return Status::OK();
}
Status ScalarColumnWriter::append_data_in_current_page(const uint8_t** data, size_t* num_written) {
RETURN_IF_ERROR(append_data_in_current_page(*data, num_written));
*data += get_field()->size() * (*num_written);
return Status::OK();
}
Status ScalarColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
// When optimization is disabled, use base class implementation
if (!config::enable_rle_batch_put_optimization) {
return ColumnWriter::append_nullable(null_map, ptr, num_rows);
}
if (UNLIKELY(num_rows == 0)) {
return Status::OK();
}
// Build run-length encoded null runs using memchr for fast boundary detection
_null_run_buffer.clear();
if (_null_run_buffer.capacity() < num_rows) {
_null_run_buffer.reserve(std::min(num_rows, size_t(256)));
}
size_t non_null_count = 0;
size_t offset = 0;
while (offset < num_rows) {
bool is_null = null_map[offset] != 0;
size_t remaining = num_rows - offset;
const uint8_t* run_end =
static_cast<const uint8_t*>(memchr(null_map + offset, is_null ? 0 : 1, remaining));
size_t run_length = run_end != nullptr ? (run_end - (null_map + offset)) : remaining;
_null_run_buffer.push_back(NullRun {is_null, static_cast<uint32_t>(run_length)});
if (!is_null) {
non_null_count += run_length;
}
offset += run_length;
}
// Pre-allocate buffer based on estimated size
if (_null_bitmap_builder != nullptr) {
size_t current_rows = _next_rowid - _first_rowid;
size_t expected_rows = current_rows + num_rows;
size_t est_non_null = non_null_count;
if (num_rows > 0 && expected_rows > num_rows) {
est_non_null = (non_null_count * expected_rows) / num_rows;
}
_null_bitmap_builder->reserve_for_write(expected_rows, est_non_null);
}
if (non_null_count == 0) {
// All NULL: skip data writing, only update null bitmap and indexes
RETURN_IF_ERROR(append_nulls(num_rows));
*ptr += get_field()->size() * num_rows;
return Status::OK();
}
if (non_null_count == num_rows) {
// All non-NULL: use normal append_data which handles both data and null bitmap
return append_data(ptr, num_rows);
}
// Process by runs
for (const auto& run : _null_run_buffer) {
size_t run_length = run.len;
if (run.is_null) {
RETURN_IF_ERROR(append_nulls(run_length));
*ptr += get_field()->size() * run_length;
} else {
// TODO:
// 1. `*ptr += get_field()->size() * step;` should do in this function, not append_data;
// 2. support array vectorized load and ptr offset add
RETURN_IF_ERROR(append_data(ptr, run_length));
}
}
return Status::OK();
}
uint64_t ScalarColumnWriter::estimate_buffer_size() {
uint64_t size = _data_size;
size += _page_builder->size();
if (is_nullable()) {
size += _null_bitmap_builder->size();
}
size += _ordinal_index_builder->size();
if (_opts.need_zone_map) {
size += _zone_map_index_builder->size();
}
if (_opts.need_bloom_filter) {
size += _bloom_filter_index_builder->size();
}
return size;
}
Status ScalarColumnWriter::finish() {
RETURN_IF_ERROR(finish_current_page());
_opts.meta->set_num_rows(_next_rowid);
return Status::OK();
}
Status ScalarColumnWriter::write_data() {
auto offset = _file_writer->bytes_appended();
auto collect_uncompressed_bytes = [](const PageFooterPB& footer) {
return footer.uncompressed_size() + footer.ByteSizeLong() +
sizeof(uint32_t) /* footer size */ + sizeof(uint32_t) /* checksum */;
};
for (auto& page : _pages) {
_total_uncompressed_data_pages_size += collect_uncompressed_bytes(page->footer);
RETURN_IF_ERROR(_write_data_page(page.get()));
}
_pages.clear();
// write column dict
if (_encoding_info->encoding() == DICT_ENCODING) {
OwnedSlice dict_body;
RETURN_IF_ERROR(_page_builder->get_dictionary_page(&dict_body));
EncodingTypePB dict_word_page_encoding;
RETURN_IF_ERROR(_page_builder->get_dictionary_page_encoding(&dict_word_page_encoding));
PageFooterPB footer;
footer.set_type(DICTIONARY_PAGE);
footer.set_uncompressed_size(cast_set<uint32_t>(dict_body.slice().get_size()));
footer.mutable_dict_page_footer()->set_encoding(dict_word_page_encoding);
_total_uncompressed_data_pages_size += collect_uncompressed_bytes(footer);
PagePointer dict_pp;
RETURN_IF_ERROR(PageIO::compress_and_write_page(
_compress_codec, _opts.compression_min_space_saving, _file_writer,
{dict_body.slice()}, footer, &dict_pp));
dict_pp.to_proto(_opts.meta->mutable_dict_page());
}
_total_compressed_data_pages_size += _file_writer->bytes_appended() - offset;
_page_builder.reset();
return Status::OK();
}
Status ScalarColumnWriter::write_ordinal_index() {
return _ordinal_index_builder->finish(_file_writer, _opts.meta->add_indexes());
}
Status ScalarColumnWriter::write_zone_map() {
if (_opts.need_zone_map) {
return _zone_map_index_builder->finish(_file_writer, _opts.meta->add_indexes());
}
return Status::OK();
}
Status ScalarColumnWriter::write_inverted_index() {
if (_opts.need_inverted_index) {
for (const auto& builder : _inverted_index_builders) {
RETURN_IF_ERROR(builder->finish());
}
}
return Status::OK();
}
Status ScalarColumnWriter::write_bloom_filter_index() {
if (_opts.need_bloom_filter) {
return _bloom_filter_index_builder->finish(_file_writer, _opts.meta->add_indexes());
}
return Status::OK();
}
// write a data page into file and update ordinal index
Status ScalarColumnWriter::_write_data_page(Page* page) {
PagePointer pp;
std::vector<Slice> compressed_body;
for (auto& data : page->data) {
compressed_body.push_back(data.slice());
}
RETURN_IF_ERROR(PageIO::write_page(_file_writer, compressed_body, page->footer, &pp));
_ordinal_index_builder->append_entry(page->footer.data_page_footer().first_ordinal(), pp);
return Status::OK();
}
Status ScalarColumnWriter::finish_current_page() {
if (_next_rowid == _first_rowid) {
return Status::OK();
}
if (_opts.need_zone_map) {
// If the number of rows in the current page is less than the threshold,
// we will invalidate zone map index for this page by set pass_all to true.
if (_next_rowid - _first_rowid < config::zone_map_row_num_threshold) {
_zone_map_index_builder->invalid_page_zone_map();
}
RETURN_IF_ERROR(_zone_map_index_builder->flush());
}
if (_opts.need_bloom_filter) {
RETURN_IF_ERROR(_bloom_filter_index_builder->flush());
}
_raw_data_bytes += _page_builder->get_raw_data_size();
// build data page body : encoded values + [nullmap]
std::vector<Slice> body;
OwnedSlice encoded_values;
RETURN_IF_ERROR(_page_builder->finish(&encoded_values));
RETURN_IF_ERROR(_page_builder->reset());
body.push_back(encoded_values.slice());
OwnedSlice nullmap;
if (_null_bitmap_builder != nullptr) {
if (is_nullable() && _null_bitmap_builder->has_null()) {
RETURN_IF_ERROR(_null_bitmap_builder->finish(&nullmap));
body.push_back(nullmap.slice());
}
_null_bitmap_builder->reset();
}
// prepare data page footer
std::unique_ptr<Page> page(new Page());
page->footer.set_type(DATA_PAGE);
page->footer.set_uncompressed_size(cast_set<uint32_t>(Slice::compute_total_size(body)));
auto* data_page_footer = page->footer.mutable_data_page_footer();
data_page_footer->set_first_ordinal(_first_rowid);
data_page_footer->set_num_values(_next_rowid - _first_rowid);
data_page_footer->set_nullmap_size(cast_set<uint32_t>(nullmap.slice().size));
if (_new_page_callback != nullptr) {
_new_page_callback->put_extra_info_in_page(data_page_footer);
}
// trying to compress page body
OwnedSlice compressed_body;
RETURN_IF_ERROR(PageIO::compress_page_body(_compress_codec, _opts.compression_min_space_saving,
body, &compressed_body));
if (compressed_body.slice().empty()) {
// page body is uncompressed
page->data.emplace_back(std::move(encoded_values));
page->data.emplace_back(std::move(nullmap));
} else {
// page body is compressed
page->data.emplace_back(std::move(compressed_body));
}
_push_back_page(std::move(page));
_first_rowid = _next_rowid;
return Status::OK();
}
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////
// offset column writer
////////////////////////////////////////////////////////////////////////////////
OffsetColumnWriter::OffsetColumnWriter(const ColumnWriterOptions& opts,
std::unique_ptr<Field> field, io::FileWriter* file_writer)
: ScalarColumnWriter(opts, std::move(field), file_writer) {
// now we only explain data in offset column as uint64
DCHECK(get_field()->type() == FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT);
}
OffsetColumnWriter::~OffsetColumnWriter() = default;
Status OffsetColumnWriter::init() {
RETURN_IF_ERROR(ScalarColumnWriter::init());
register_flush_page_callback(this);
_next_offset = 0;
return Status::OK();
}
Status OffsetColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
size_t remaining = num_rows;
while (remaining > 0) {
size_t num_written = remaining;
RETURN_IF_ERROR(append_data_in_current_page(ptr, &num_written));
// _next_offset after append_data_in_current_page is the offset of next data, which will used in finish_current_page() to set next_array_item_ordinal
_next_offset = *(const uint64_t*)(*ptr);
remaining -= num_written;
if (_page_builder->is_page_full()) {
// get next data for next array_item_rowid
RETURN_IF_ERROR(finish_current_page());
}
}
return Status::OK();
}
void OffsetColumnWriter::put_extra_info_in_page(DataPageFooterPB* footer) {
footer->set_next_array_item_ordinal(_next_offset);
}
StructColumnWriter::StructColumnWriter(
const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer,
std::vector<std::unique_ptr<ColumnWriter>>& sub_column_writers)
: ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), _opts(opts) {
for (auto& sub_column_writer : sub_column_writers) {
_sub_column_writers.push_back(std::move(sub_column_writer));
}
_num_sub_column_writers = _sub_column_writers.size();
DCHECK(_num_sub_column_writers >= 1);
if (is_nullable()) {
_null_writer.reset(null_writer);
}
}
Status StructColumnWriter::init() {
for (auto& column_writer : _sub_column_writers) {
RETURN_IF_ERROR(column_writer->init());
}
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->init());
}
return Status::OK();
}
Status StructColumnWriter::write_inverted_index() {
if (_opts.need_inverted_index) {
for (auto& column_writer : _sub_column_writers) {
RETURN_IF_ERROR(column_writer->write_inverted_index());
}
}
return Status::OK();
}
Status StructColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
RETURN_IF_ERROR(append_data(ptr, num_rows));
RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
return Status::OK();
}
Status StructColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
const auto* results = reinterpret_cast<const uint64_t*>(*ptr);
for (size_t i = 0; i < _num_sub_column_writers; ++i) {
auto nullmap = *(results + _num_sub_column_writers + i);
auto data = *(results + i);
RETURN_IF_ERROR(_sub_column_writers[i]->append(reinterpret_cast<const uint8_t*>(nullmap),
reinterpret_cast<const void*>(data),
num_rows));
}
return Status::OK();
}
uint64_t StructColumnWriter::estimate_buffer_size() {
uint64_t size = 0;
for (auto& column_writer : _sub_column_writers) {
size += column_writer->estimate_buffer_size();
}
size += is_nullable() ? _null_writer->estimate_buffer_size() : 0;
return size;
}
Status StructColumnWriter::finish() {
for (auto& column_writer : _sub_column_writers) {
RETURN_IF_ERROR(column_writer->finish());
}
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->finish());
}
_opts.meta->set_num_rows(get_next_rowid());
return Status::OK();
}
Status StructColumnWriter::write_data() {
for (auto& column_writer : _sub_column_writers) {
RETURN_IF_ERROR(column_writer->write_data());
}
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->write_data());
}
return Status::OK();
}
Status StructColumnWriter::write_ordinal_index() {
for (auto& column_writer : _sub_column_writers) {
RETURN_IF_ERROR(column_writer->write_ordinal_index());
}
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->write_ordinal_index());
}
return Status::OK();
}
Status StructColumnWriter::append_nulls(size_t num_rows) {
for (auto& column_writer : _sub_column_writers) {
RETURN_IF_ERROR(column_writer->append_nulls(num_rows));
}
if (is_nullable()) {
std::vector<vectorized::UInt8> null_signs(num_rows, 1);
const uint8_t* null_sign_ptr = null_signs.data();
RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
}
return Status::OK();
}
Status StructColumnWriter::finish_current_page() {
return Status::NotSupported("struct writer has no data, can not finish_current_page");
}
ArrayColumnWriter::ArrayColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
OffsetColumnWriter* offset_writer,
ScalarColumnWriter* null_writer,
std::unique_ptr<ColumnWriter> item_writer)
: ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta),
_item_writer(std::move(item_writer)),
_opts(opts) {
_offset_writer.reset(offset_writer);
if (is_nullable()) {
_null_writer.reset(null_writer);
}
}
Status ArrayColumnWriter::init() {
RETURN_IF_ERROR(_offset_writer->init());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->init());
}
RETURN_IF_ERROR(_item_writer->init());
if (_opts.need_inverted_index) {
auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
if (writer != nullptr) {
RETURN_IF_ERROR(IndexColumnWriter::create(get_field(), &_inverted_index_writer,
_opts.index_file_writer,
_opts.inverted_indexes[0]));
}
}
if (_opts.need_ann_index) {
auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
if (writer != nullptr) {
_ann_index_writer = std::make_unique<AnnIndexColumnWriter>(_opts.index_file_writer,
_opts.ann_index);
RETURN_IF_ERROR(_ann_index_writer->init());
}
}
return Status::OK();
}
Status ArrayColumnWriter::write_inverted_index() {
if (_opts.need_inverted_index) {
return _inverted_index_writer->finish();
}
return Status::OK();
}
Status ArrayColumnWriter::write_ann_index() {
if (_opts.need_ann_index) {
return _ann_index_writer->finish();
}
return Status::OK();
}
// batch append data for array
Status ArrayColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
// data_ptr contains
// [size, offset_ptr, item_data_ptr, item_nullmap_ptr]
auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
// total number length
size_t element_cnt = size_t((unsigned long)(*data_ptr));
auto offset_data = *(data_ptr + 1);
const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
auto data = *(data_ptr + 2);
auto nested_null_map = *(data_ptr + 3);
if (element_cnt > 0) {
RETURN_IF_ERROR(_item_writer->append(reinterpret_cast<const uint8_t*>(nested_null_map),
reinterpret_cast<const void*>(data), element_cnt));
}
if (_opts.need_inverted_index) {
auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
// now only support nested type is scala
if (writer != nullptr) {
//NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
RETURN_IF_ERROR(_inverted_index_writer->add_array_values(
_item_writer->get_field()->size(), reinterpret_cast<const void*>(data),
reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
}
}
if (_opts.need_ann_index) {
auto* writer = dynamic_cast<ScalarColumnWriter*>(_item_writer.get());
// now only support nested type is scala
if (writer != nullptr) {
//NOTE: use array field name as index field, but item_writer size should be used when moving item_data_ptr
RETURN_IF_ERROR(_ann_index_writer->add_array_values(
_item_writer->get_field()->size(), reinterpret_cast<const void*>(data),
reinterpret_cast<const uint8_t*>(nested_null_map), offsets_ptr, num_rows));
} else {
return Status::NotSupported(
"Ann index can only be build on array with scalar type. but got {} as "
"nested",
_item_writer->get_field()->type());
}
}
RETURN_IF_ERROR(_offset_writer->append_data(&offsets_ptr, num_rows));
return Status::OK();
}
uint64_t ArrayColumnWriter::estimate_buffer_size() {
return _offset_writer->estimate_buffer_size() +
(is_nullable() ? _null_writer->estimate_buffer_size() : 0) +
_item_writer->estimate_buffer_size();
}
Status ArrayColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
RETURN_IF_ERROR(append_data(ptr, num_rows));
if (is_nullable()) {
if (_opts.need_inverted_index) {
RETURN_IF_ERROR(_inverted_index_writer->add_array_nulls(null_map, num_rows));
}
RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
}
return Status::OK();
}
Status ArrayColumnWriter::finish() {
RETURN_IF_ERROR(_offset_writer->finish());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->finish());
}
RETURN_IF_ERROR(_item_writer->finish());
_opts.meta->set_num_rows(get_next_rowid());
return Status::OK();
}
Status ArrayColumnWriter::write_data() {
RETURN_IF_ERROR(_offset_writer->write_data());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->write_data());
}
RETURN_IF_ERROR(_item_writer->write_data());
return Status::OK();
}
Status ArrayColumnWriter::write_ordinal_index() {
RETURN_IF_ERROR(_offset_writer->write_ordinal_index());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->write_ordinal_index());
}
if (!has_empty_items()) {
RETURN_IF_ERROR(_item_writer->write_ordinal_index());
}
return Status::OK();
}
Status ArrayColumnWriter::append_nulls(size_t num_rows) {
size_t num_lengths = num_rows;
const ordinal_t offset = _item_writer->get_next_rowid();
while (num_lengths > 0) {
// TODO llj bulk write
const auto* offset_ptr = reinterpret_cast<const uint8_t*>(&offset);
RETURN_IF_ERROR(_offset_writer->append_data(&offset_ptr, 1));
--num_lengths;
}
return write_null_column(num_rows, true);
}
Status ArrayColumnWriter::write_null_column(size_t num_rows, bool is_null) {
uint8_t null_sign = is_null ? 1 : 0;
while (is_nullable() && num_rows > 0) {
// TODO llj bulk write
const uint8_t* null_sign_ptr = &null_sign;
RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, 1));
--num_rows;
}
return Status::OK();
}
Status ArrayColumnWriter::finish_current_page() {
return Status::NotSupported("array writer has no data, can not finish_current_page");
}
/// ============================= MapColumnWriter =====================////
MapColumnWriter::MapColumnWriter(const ColumnWriterOptions& opts, std::unique_ptr<Field> field,
ScalarColumnWriter* null_writer, OffsetColumnWriter* offset_writer,
std::vector<std::unique_ptr<ColumnWriter>>& kv_writers)
: ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta), _opts(opts) {
CHECK_EQ(kv_writers.size(), 2);
_offsets_writer.reset(offset_writer);
if (is_nullable()) {
_null_writer.reset(null_writer);
}
for (auto& sub_writers : kv_writers) {
_kv_writers.push_back(std::move(sub_writers));
}
}
Status MapColumnWriter::init() {
RETURN_IF_ERROR(_offsets_writer->init());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->init());
}
// here register_flush_page_callback to call this.put_extra_info_in_page()
// when finish cur data page
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->init());
}
return Status::OK();
}
uint64_t MapColumnWriter::estimate_buffer_size() {
size_t estimate = 0;
for (auto& sub_writer : _kv_writers) {
estimate += sub_writer->estimate_buffer_size();
}
estimate += _offsets_writer->estimate_buffer_size();
if (is_nullable()) {
estimate += _null_writer->estimate_buffer_size();
}
return estimate;
}
Status MapColumnWriter::finish() {
RETURN_IF_ERROR(_offsets_writer->finish());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->finish());
}
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->finish());
}
_opts.meta->set_num_rows(get_next_rowid());
return Status::OK();
}
Status MapColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
RETURN_IF_ERROR(append_data(ptr, num_rows));
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->append_data(&null_map, num_rows));
}
return Status::OK();
}
// write key value data with offsets
Status MapColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
// data_ptr contains
// [size, offset_ptr, key_data_ptr, val_data_ptr, k_nullmap_ptr, v_nullmap_pr]
// which converted results from olap_map_convertor and later will use a structure to replace it
auto data_ptr = reinterpret_cast<const uint64_t*>(*ptr);
// total number length
size_t element_cnt = size_t((unsigned long)(*data_ptr));
auto offset_data = *(data_ptr + 1);
const uint8_t* offsets_ptr = (const uint8_t*)offset_data;
if (element_cnt > 0) {
for (size_t i = 0; i < 2; ++i) {
auto data = *(data_ptr + 2 + i);
auto nested_null_map = *(data_ptr + 2 + 2 + i);
RETURN_IF_ERROR(
_kv_writers[i]->append(reinterpret_cast<const uint8_t*>(nested_null_map),
reinterpret_cast<const void*>(data), element_cnt));
}
}
// make sure the order : offset writer flush next_array_item_ordinal after kv_writers append_data
// because we use _kv_writers[0]->get_next_rowid() to set next_array_item_ordinal in offset page footer
RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
return Status::OK();
}
Status MapColumnWriter::write_data() {
RETURN_IF_ERROR(_offsets_writer->write_data());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->write_data());
}
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->write_data());
}
return Status::OK();
}
Status MapColumnWriter::write_ordinal_index() {
RETURN_IF_ERROR(_offsets_writer->write_ordinal_index());
if (is_nullable()) {
RETURN_IF_ERROR(_null_writer->write_ordinal_index());
}
for (auto& sub_writer : _kv_writers) {
if (sub_writer->get_next_rowid() != 0) {
RETURN_IF_ERROR(sub_writer->write_ordinal_index());
}
}
return Status::OK();
}
Status MapColumnWriter::append_nulls(size_t num_rows) {
for (auto& sub_writer : _kv_writers) {
RETURN_IF_ERROR(sub_writer->append_nulls(num_rows));
}
const ordinal_t offset = _kv_writers[0]->get_next_rowid();
std::vector<vectorized::UInt8> offsets_data(num_rows, cast_set<uint8_t>(offset));
const uint8_t* offsets_ptr = offsets_data.data();
RETURN_IF_ERROR(_offsets_writer->append_data(&offsets_ptr, num_rows));
if (is_nullable()) {
std::vector<vectorized::UInt8> null_signs(num_rows, 1);
const uint8_t* null_sign_ptr = null_signs.data();
RETURN_IF_ERROR(_null_writer->append_data(&null_sign_ptr, num_rows));
}
return Status::OK();
}
Status MapColumnWriter::finish_current_page() {
return Status::NotSupported("map writer has no data, can not finish_current_page");
}
Status MapColumnWriter::write_inverted_index() {
if (_opts.need_inverted_index) {
return _index_builder->finish();
}
return Status::OK();
}
VariantColumnWriter::VariantColumnWriter(const ColumnWriterOptions& opts,
const TabletColumn* column, std::unique_ptr<Field> field)
: ColumnWriter(std::move(field), opts.meta->is_nullable(), opts.meta) {
_impl = std::make_unique<VariantColumnWriterImpl>(opts, column);
}
Status VariantColumnWriter::init() {
return _impl->init();
}
Status VariantColumnWriter::append_data(const uint8_t** ptr, size_t num_rows) {
_next_rowid += num_rows;
return _impl->append_data(ptr, num_rows);
}
uint64_t VariantColumnWriter::estimate_buffer_size() {
return _impl->estimate_buffer_size();
}
Status VariantColumnWriter::finish() {
return _impl->finish();
}
Status VariantColumnWriter::write_data() {
return _impl->write_data();
}
Status VariantColumnWriter::write_ordinal_index() {
return _impl->write_ordinal_index();
}
Status VariantColumnWriter::write_zone_map() {
return _impl->write_zone_map();
}
Status VariantColumnWriter::write_inverted_index() {
return _impl->write_inverted_index();
}
Status VariantColumnWriter::write_bloom_filter_index() {
return _impl->write_bloom_filter_index();
}
Status VariantColumnWriter::append_nullable(const uint8_t* null_map, const uint8_t** ptr,
size_t num_rows) {
return _impl->append_nullable(null_map, ptr, num_rows);
}
#include "common/compile_check_end.h"
} // namespace doris::segment_v2