blob: edb4ecbe0631be5ed07aee37f244da4c2efd0a39 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include <cstdint>
#include <memory>
#include "common/status.h"
#include "pipeline/common/distinct_agg_utils.h"
#include "pipeline/exec/operator.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
namespace doris {
class ExecNode;
class RuntimeState;
namespace pipeline {
#include "common/compile_check_begin.h"
class DistinctStreamingAggOperatorX;
class DistinctStreamingAggLocalState final : public PipelineXLocalState<FakeSharedState> {
public:
using Parent = DistinctStreamingAggOperatorX;
using Base = PipelineXLocalState<FakeSharedState>;
ENABLE_FACTORY_CREATOR(DistinctStreamingAggLocalState);
DistinctStreamingAggLocalState(RuntimeState* state, OperatorXBase* parent);
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
private:
friend class DistinctStreamingAggOperatorX;
template <typename LocalStateType>
friend class StatefulOperatorX;
Status _distinct_pre_agg_with_serialized_key(vectorized::Block* in_block,
vectorized::Block* out_block);
Status _init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs);
void _emplace_into_hash_table_to_distinct(vectorized::IColumn::Selector& distinct_row,
vectorized::ColumnRawPtrs& key_columns,
const uint32_t num_rows);
void _make_nullable_output_key(vectorized::Block* block);
bool _should_expand_preagg_hash_tables();
void _swap_cache_block(vectorized::Block* block) {
DCHECK(!_cache_block.is_empty_column());
block->swap(_cache_block);
_cache_block = block->clone_empty();
}
vectorized::IColumn::Selector _distinct_row;
vectorized::Arena _arena;
size_t _input_num_rows = 0;
bool _should_expand_hash_table = true;
bool _stop_emplace_flag = false;
const int batch_size;
std::unique_ptr<vectorized::Arena> _agg_arena_pool = nullptr;
std::unique_ptr<DistinctDataVariants> _agg_data = nullptr;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::unique_ptr<vectorized::Arena> _agg_profile_arena = nullptr;
std::unique_ptr<vectorized::Block> _child_block = nullptr;
bool _child_eos = false;
bool _reach_limit = false;
std::unique_ptr<vectorized::Block> _aggregated_block = nullptr;
vectorized::Block _cache_block;
RuntimeProfile::Counter* _build_timer = nullptr;
RuntimeProfile::Counter* _expr_timer = nullptr;
RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
};
class DistinctStreamingAggOperatorX final
: public StatefulOperatorX<DistinctStreamingAggLocalState> {
public:
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution);
#ifdef BE_TEST
DistinctStreamingAggOperatorX()
: _needs_finalize(false),
_is_first_phase(true),
_partition_exprs({}),
_is_colocate(false),
_require_bucket_distribution {false} {}
#endif
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override;
bool need_more_input_data(RuntimeState* state) const override;
DataDistribution required_data_distribution(RuntimeState* state) const override {
if (_needs_finalize && _probe_expr_ctxs.empty()) {
return {ExchangeType::NOOP};
}
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return {ExchangeType::PASSTHROUGH};
}
bool require_data_distribution() const override { return _is_colocate; }
private:
friend class DistinctStreamingAggLocalState;
void init_make_nullable(RuntimeState* state);
TupleId _output_tuple_id;
TupleDescriptor* _output_tuple_desc = nullptr;
const bool _needs_finalize;
const bool _is_first_phase;
const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
const bool _require_bucket_distribution;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::vector<size_t> _make_nullable_keys;
// If _is_streaming_preagg = true, deduplication will be abandoned in cases where the deduplication rate is low.
bool _is_streaming_preagg = false;
};
} // namespace pipeline
} // namespace doris
#include "common/compile_check_end.h"