blob: afc669f95bc63db5578345d1393cd7d4150ef37d [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
// IWYU pragma: no_include <bits/chrono.h>
#include <fmt/format.h>
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <glog/logging.h>
#include <cstdint>
#include <functional>
#include <string>
#include <unordered_set>
#include <vector>
#include "common/status.h"
#include "core/block/block.h"
#include "core/custom_allocator.h"
#include "exec/sink/vtablet_block_convertor.h"
#include "exec/sink/vtablet_finder.h"
#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "storage/tablet_info.h"
namespace doris {
class IndexChannel;
class VNodeChannel;
// <row_idx, partition_id, tablet_id>
class RowPartTabletIds {
public:
DorisVector<uint32_t> row_ids;
DorisVector<int64_t> partition_ids;
DorisVector<int64_t> tablet_ids;
std::string debug_string() const {
std::string value;
value.reserve(row_ids.size() * 15);
for (int i = 0; i < row_ids.size(); i++) {
value.append(fmt::format("[{}, {}, {}]", row_ids[i], partition_ids[i], tablet_ids[i]));
}
return value;
}
};
// void* for caller
using CreatePartitionCallback = Status (*)(void*, TCreatePartitionResult*);
class VRowDistribution {
public:
// only used to pass parameters for VRowDistribution
struct VRowDistributionContext {
RuntimeState* state = nullptr;
OlapTableBlockConvertor* block_convertor = nullptr;
OlapTabletFinder* tablet_finder = nullptr;
VOlapTablePartitionParam* vpartition = nullptr;
RuntimeProfile::Counter* add_partition_request_timer = nullptr;
int64_t txn_id = -1;
ObjectPool* pool = nullptr;
OlapTableLocationParam* location = nullptr;
const VExprContextSPtrs* vec_output_expr_ctxs = nullptr;
std::shared_ptr<OlapTableSchemaParam> schema;
void* caller = nullptr;
bool write_single_replica = false;
CreatePartitionCallback create_partition_callback;
};
friend class VTabletWriter;
friend class VTabletWriterV2;
VRowDistribution() = default;
virtual ~VRowDistribution() = default;
void init(VRowDistributionContext ctx) {
_state = ctx.state;
_batch_size = std::max(_state->batch_size(), 8192);
_block_convertor = ctx.block_convertor;
_tablet_finder = ctx.tablet_finder;
_vpartition = ctx.vpartition;
_add_partition_request_timer = ctx.add_partition_request_timer;
_txn_id = ctx.txn_id;
_pool = ctx.pool;
_location = ctx.location;
_vec_output_expr_ctxs = ctx.vec_output_expr_ctxs;
_schema = ctx.schema;
_caller = ctx.caller;
_write_single_replica = ctx.write_single_replica;
_create_partition_callback = ctx.create_partition_callback;
}
void output_profile_info(RuntimeProfile* profile) {
if (!_add_partition_request_times.empty()) {
std::stringstream ss;
ss << "[";
for (size_t i = 0; i < _add_partition_request_times.size(); ++i) {
if (i > 0) {
ss << ", ";
}
ss << PrettyPrinter::print(_add_partition_request_times[i], TUnit::TIME_NS);
}
ss << "]";
profile->add_info_string("AddPartitionRequestTimeList", ss.str());
}
}
Status open(RowDescriptor* output_row_desc) {
if (_vpartition->is_auto_partition()) {
auto [part_ctxs, part_funcs] = _get_partition_function();
for (auto part_ctx : part_ctxs) {
RETURN_IF_ERROR(part_ctx->prepare(_state, *output_row_desc));
RETURN_IF_ERROR(part_ctx->open(_state));
}
}
for (const auto& index : _schema->indexes()) {
auto& where_clause = index->where_clause;
if (where_clause != nullptr) {
RETURN_IF_ERROR(where_clause->prepare(_state, *output_row_desc));
RETURN_IF_ERROR(where_clause->open(_state));
}
}
return Status::OK();
}
// auto partition
// mv where clause
// v1 needs index->node->row_ids - tabletids
// v2 needs index,tablet->rowids
Status generate_rows_distribution(Block& input_block, std::shared_ptr<Block>& block,
std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
// have 2 ways remind to deal batching block:
// 1. in row_distribution, _batching_rows reaches the threshold, this class set _deal_batched = true.
// 2. in caller, after last block and before close, set _deal_batched = true.
bool need_deal_batching() const { return _deal_batched && _batching_rows > 0; }
// create partitions when need for auto-partition table using #_partitions_need_create.
Status automatic_create_partition();
void clear_batching_stats();
const std::vector<bool>& get_skipped() const { return _skip; } // skipped in last round
// for auto partition
std::unique_ptr<MutableBlock> _batching_block; // same structure with input_block
bool _deal_batched = false; // If true, send batched block before any block's append.
private:
std::pair<VExprContextSPtrs, VExprSPtrs> _get_partition_function();
Status _save_missing_values(const Block& input_block,
std::vector<std::vector<std::string>>& col_strs, int col_size,
Block* block, const std::vector<uint32_t>& filter,
const std::vector<const NullMap*>& col_null_maps);
void _get_tablet_ids(Block* block, int32_t index_idx, std::vector<int64_t>& tablet_ids);
void _filter_block_by_skip(Block* block, RowPartTabletIds& row_part_tablet_id);
Status _filter_block_by_skip_and_where_clause(Block* block,
const VExprContextSPtr& where_clause,
RowPartTabletIds& row_part_tablet_id);
Status _filter_block(Block* block, std::vector<RowPartTabletIds>& row_part_tablet_ids);
Status _generate_rows_distribution_for_auto_partition(
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_col_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
// the whole process to deal missing rows. will call _save_missing_values
Status _deal_missing_map(const Block& input_block, Block* block,
const std::vector<uint16_t>& partition_cols_idx,
int64_t& rows_stat_val);
Status _generate_rows_distribution_for_non_auto_partition(
Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids);
Status _generate_rows_distribution_for_auto_overwrite(
const Block& input_block, Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
Status _replace_overwriting_partition();
void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t rows);
void _reset_find_tablets(int64_t rows);
struct NullableStringListHash {
std::size_t _hash(const TNullableStringLiteral& arg) const {
if (arg.is_null) {
return 0;
}
return std::hash<std::string>()(arg.value);
}
std::size_t operator()(const std::vector<TNullableStringLiteral>& arg) const {
std::size_t result = 0;
for (const auto& v : arg) {
result = (result << 1) ^ _hash(v);
}
return result;
}
};
RuntimeState* _state = nullptr;
int _batch_size = 0;
// for auto partitions
std::vector<std::vector<TNullableStringLiteral>> _partitions_need_create;
size_t _batching_rows = 0, _batching_bytes = 0;
std::unordered_set<std::vector<TNullableStringLiteral>, NullableStringListHash> _deduper;
OlapTableBlockConvertor* _block_convertor = nullptr;
OlapTabletFinder* _tablet_finder = nullptr;
VOlapTablePartitionParam* _vpartition = nullptr;
RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
int64_t _txn_id = -1;
ObjectPool* _pool = nullptr;
OlapTableLocationParam* _location = nullptr;
// Record each auto-partition request time for detailed profiling
std::vector<int64_t> _add_partition_request_times;
// int64_t _number_output_rows = 0;
const VExprContextSPtrs* _vec_output_expr_ctxs = nullptr;
// generally it's writer's on_partitions_created
CreatePartitionCallback _create_partition_callback = nullptr;
void* _caller = nullptr;
std::shared_ptr<OlapTableSchemaParam> _schema;
bool _write_single_replica = false;
// reuse for find_tablet. save partitions found by find_tablets
std::vector<VOlapTablePartition*> _partitions;
std::vector<bool> _skip;
std::vector<uint32_t> _tablet_indexes;
std::vector<int64_t> _tablet_ids;
std::vector<uint32_t> _missing_map; // indice of missing values in partition_col
// for auto detect overwrite partition
std::set<int64_t> _new_partition_ids; // if contains, not to replace it again.
};
} // namespace doris