blob: 0bd5810614692fb062d29e45520b0a89d4c87011 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/internal_service.pb.h>
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/rec_cte_shared_state.h"
#include "util/brpc_client_cache.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
namespace doris {
#include "common/compile_check_begin.h"
class RuntimeState;
namespace vectorized {
class Block;
} // namespace vectorized
namespace pipeline {
class DataQueue;
class RecCTESourceOperatorX;
class RecCTESourceLocalState final : public PipelineXLocalState<RecCTESharedState> {
public:
ENABLE_FACTORY_CREATOR(RecCTESourceLocalState);
using Base = PipelineXLocalState<RecCTESharedState>;
using Parent = RecCTESourceOperatorX;
RecCTESourceLocalState(RuntimeState* state, OperatorXBase* parent);
Status open(RuntimeState* state) override;
Status init(RuntimeState* state, LocalStateInfo& info) override;
bool is_blockable() const override { return true; }
std::vector<Dependency*> dependencies() const override {
return std::vector<Dependency*> {_dependency, _anchor_dependency.get()};
}
private:
friend class RecCTESourceOperatorX;
friend class OperatorX<RecCTESourceLocalState>;
std::shared_ptr<Dependency> _anchor_dependency = nullptr;
};
class RecCTESourceOperatorX : public OperatorX<RecCTESourceLocalState> {
public:
using Base = OperatorX<RecCTESourceLocalState>;
RecCTESourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs)
: Base(pool, tnode, operator_id, descs),
_is_union_all(tnode.rec_cte_node.is_union_all),
_targets(tnode.rec_cte_node.targets),
_fragments_to_reset(tnode.rec_cte_node.fragments_to_reset),
_global_rf_ids(tnode.rec_cte_node.rec_side_runtime_filter_ids),
_is_used_by_other_rec_cte(tnode.rec_cte_node.is_used_by_other_rec_cte) {
DCHECK(tnode.__isset.rec_cte_node);
}
~RecCTESourceOperatorX() override = default;
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status terminate(RuntimeState* state) override {
RETURN_IF_ERROR(_send_close(state));
return Base::terminate(state);
}
Status close(RuntimeState* state) override {
RETURN_IF_ERROR(_send_close(state));
return Base::close(state);
}
Status set_child(OperatorPtr child) override {
Base::_child = child;
return Status::OK();
}
bool is_serial_operator() const override { return true; }
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
return {ExchangeType::NOOP};
}
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override {
auto& local_state = get_local_state(state);
auto& ctx = local_state._shared_state;
ctx->update_ready_to_return();
if (!ctx->ready_to_return) {
if (ctx->current_round + 1 > _max_recursion_depth) {
return Status::Aborted("reach cte_max_recursion_depth {}", _max_recursion_depth);
}
ctx->source_dep->block();
// ctx->blocks.size() may be changed after _recursive_process
int current_blocks_size = int(ctx->blocks.size());
RETURN_IF_ERROR(_recursive_process(state, ctx->last_round_offset));
ctx->current_round++;
ctx->last_round_offset = current_blocks_size;
} else {
if (ctx->blocks.empty()) {
*eos = true;
} else {
block->swap(ctx->blocks.back());
RETURN_IF_ERROR(local_state.filter_block(local_state.conjuncts(), block));
ctx->blocks.pop_back();
}
}
return Status::OK();
}
bool is_source() const override { return true; }
private:
Status _send_close(RuntimeState* state) {
if (!_already_send_close && !_is_used_by_other_rec_cte) {
_already_send_close = true;
auto* round_counter = ADD_COUNTER(get_local_state(state).Base::custom_profile(),
"RecursiveRound", TUnit::UNIT);
round_counter->set(int64_t(get_local_state(state)._shared_state->current_round));
RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::close));
}
return Status::OK();
}
Status _recursive_process(RuntimeState* state, size_t last_round_offset) const {
RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::wait));
RETURN_IF_ERROR(_send_reset_global_rf(state));
RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::release));
RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::rebuild));
RETURN_IF_ERROR(_send_rerun_fragments(state, PRerunFragmentParams::submit));
RETURN_IF_ERROR(get_local_state(state)._shared_state->send_data_to_targets(
state, last_round_offset));
return Status::OK();
}
Status _send_reset_global_rf(RuntimeState* state) const {
TNetworkAddress addr;
RETURN_IF_ERROR(state->global_runtime_filter_mgr()->get_merge_addr(&addr));
auto stub =
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(addr);
PResetGlobalRfParams request;
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
for (auto filter_id : _global_rf_ids) {
request.add_filter_ids(filter_id);
}
PResetGlobalRfResult result;
brpc::Controller controller;
controller.set_timeout_ms(
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
stub->reset_global_rf(&controller, &request, &result, brpc::DoNothing());
brpc::Join(controller.call_id());
return Status::create(result.status());
}
Status _send_rerun_fragments(RuntimeState* state, PRerunFragmentParams_Opcode stage) const {
for (auto fragment : _fragments_to_reset) {
auto stub =
state->get_query_ctx()->exec_env()->brpc_internal_client_cache()->get_client(
fragment.addr);
PRerunFragmentParams request;
request.mutable_query_id()->CopyFrom(UniqueId(state->query_id()).to_proto());
request.set_fragment_id(fragment.fragment_id);
request.set_stage(stage);
PRerunFragmentResult result;
brpc::Controller controller;
controller.set_timeout_ms(
get_execution_rpc_timeout_ms(state->get_query_ctx()->execution_timeout()));
stub->rerun_fragment(&controller, &request, &result, brpc::DoNothing());
brpc::Join(controller.call_id());
if (controller.Failed()) {
return Status::InternalError(controller.ErrorText());
}
auto rpc_st = Status::create(result.status());
if (!rpc_st.ok()) {
return rpc_st;
}
}
return Status::OK();
}
friend class RecCTESourceLocalState;
std::vector<vectorized::DataTypePtr> _hash_table_key_types;
const bool _is_union_all = false;
std::vector<TRecCTETarget> _targets;
std::vector<TRecCTEResetInfo> _fragments_to_reset;
std::vector<int> _global_rf_ids;
int _max_recursion_depth = 0;
bool _already_send_close = false;
bool _is_used_by_other_rec_cte = false;
};
} // namespace pipeline
#include "common/compile_check_end.h"
} // namespace doris