blob: e6443b2a77e3fd2063a14d3af8653cef0d1fbff6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <stdint.h>
#include "common/be_mock_util.h"
#include "common/status.h"
#include "exec/operator/operator.h"
namespace doris {
class RuntimeState;
#include "common/compile_check_begin.h"
class AggSourceOperatorX;
class AggLocalState MOCK_REMOVE(final) : public PipelineXLocalState<AggSharedState> {
public:
using Base = PipelineXLocalState<AggSharedState>;
ENABLE_FACTORY_CREATOR(AggLocalState);
AggLocalState(RuntimeState* state, OperatorXBase* parent);
~AggLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status close(RuntimeState* state) override;
void make_nullable_output_key(Block* block);
Status merge_with_serialized_key_helper(Block* block);
void do_agg_limit(Block* block, bool* eos);
protected:
friend class AggSourceOperatorX;
Status _get_without_key_result(RuntimeState* state, Block* block, bool* eos);
Status _get_results_without_key(RuntimeState* state, Block* block, bool* eos);
Status _get_with_serialized_key_result(RuntimeState* state, Block* block, bool* eos);
Status _get_results_with_serialized_key(RuntimeState* state, Block* block, bool* eos);
Status _create_agg_status(AggregateDataPtr data);
void _make_nullable_output_key(Block* block) {
if (block->rows() != 0) {
auto& shared_state = *Base ::_shared_state;
for (auto cid : shared_state.make_nullable_keys) {
block->get_by_position(cid).column =
make_nullable(block->get_by_position(cid).column);
block->get_by_position(cid).type = make_nullable(block->get_by_position(cid).type);
}
}
}
void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns,
uint32_t num_rows);
PODArray<AggregateDataPtr> _places;
std::vector<char> _deserialize_buffer;
RuntimeProfile::Counter* _get_results_timer = nullptr;
RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr;
RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr;
RuntimeProfile::Counter* _insert_values_to_column_timer = nullptr;
RuntimeProfile::Counter* _hash_table_compute_timer = nullptr;
RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr;
RuntimeProfile::Counter* _hash_table_input_counter = nullptr;
RuntimeProfile::Counter* _hash_table_size_counter = nullptr;
RuntimeProfile::Counter* _hash_table_memory_usage = nullptr;
RuntimeProfile::Counter* _merge_timer = nullptr;
RuntimeProfile::Counter* _deserialize_data_timer = nullptr;
RuntimeProfile::Counter* _memory_usage_container = nullptr;
RuntimeProfile::Counter* _memory_usage_arena = nullptr;
using vectorized_get_result =
std::function<Status(RuntimeState* state, Block* block, bool* eos)>;
struct executor {
vectorized_get_result get_result;
};
executor _executor;
};
class AggSourceOperatorX : public OperatorX<AggLocalState> {
public:
using Base = OperatorX<AggLocalState>;
AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs);
~AggSourceOperatorX() override = default;
#ifdef BE_TEST
AggSourceOperatorX() = default;
#endif
Status get_block(RuntimeState* state, Block* block, bool* eos) override;
bool is_source() const override { return true; }
Status merge_with_serialized_key_helper(RuntimeState* state, Block* block);
size_t get_estimated_memory_size_for_merging(RuntimeState* state, size_t rows) const;
Status reset_hash_table(RuntimeState* state);
/// Get a block of serialized intermediate aggregate states from the hash table.
/// Unlike get_block() which may finalize, this always outputs the serialized
/// intermediate format (key columns + serialized agg state columns), which is
/// the same format as the spill block. This is needed for repartitioning during
/// multi-level spill recovery: the data must be re-mergeable after repartitioning.
Status get_serialized_block(RuntimeState* state, Block* block, bool* eos);
private:
friend class AggLocalState;
bool _needs_finalize;
bool _without_key;
// left / full join will change the key nullable make output/input solt
// nullable diff. so we need make nullable of it.
std::vector<size_t> _make_nullable_keys;
};
} // namespace doris
#include "common/compile_check_end.h"