blob: 6a84c6fc2b0e4593c0ff5a091077018e61996b74 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <atomic>
#include <functional>
#include <utility>
#include "exec/partitioner/partitioner.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/query_context.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
namespace doris {
#include "common/compile_check_begin.h"
// Default spill partitioner for initial partitioning (level-0). Repartition
// paths may use different channel-id policies (e.g. raw-hash mode).
using SpillPartitionerType = Crc32HashPartitioner<SpillPartitionChannelIds>;
// Repartition partitioner: keeps raw hash (no final modulo) so SpillRepartitioner
// can apply level-aware hash mixing and channel mapping.
using SpillRePartitionerType = Crc32HashPartitioner<SpillRePartitionChannelIds>;
struct SpillContext {
std::atomic_int running_tasks_count;
TUniqueId query_id;
std::function<void(SpillContext*)> all_tasks_finished_callback;
SpillContext(int running_tasks_count_, TUniqueId query_id_,
std::function<void(SpillContext*)> all_tasks_finished_callback_)
: running_tasks_count(running_tasks_count_),
query_id(std::move(query_id_)),
all_tasks_finished_callback(std::move(all_tasks_finished_callback_)) {}
~SpillContext() {
if (running_tasks_count.load() != 0) {
LOG(WARNING) << "Query: " << print_id(query_id)
<< " not all spill tasks finished, remaining tasks: "
<< running_tasks_count.load();
}
}
void on_task_finished() {
auto count = running_tasks_count.fetch_sub(1);
if (count == 1) {
all_tasks_finished_callback(this);
}
}
};
// helper to execute a spill function synchronously. The old code used
// SpillRunnable/SpillSinkRunnable/SpillRecoverRunnable wrappers to track
// counters and optionally notify a SpillContext. Since spill operations are
// now performed synchronously and external code already maintains any
// necessary counters, those wrappers are no longer necessary. We keep a
// small utility to run the provided callbacks and forward cancellation.
inline Status run_spill_task(RuntimeState* state, std::function<Status()> exec_func,
std::function<Status()> fin_cb = {}) {
RETURN_IF_ERROR(exec_func());
if (fin_cb) {
RETURN_IF_ERROR(fin_cb());
}
return Status::OK();
}
template <bool accumulating>
inline void update_profile_from_inner_profile(const std::string& name,
RuntimeProfile* runtime_profile,
RuntimeProfile* inner_profile) {
auto* inner_counter = inner_profile->get_counter(name);
DCHECK(inner_counter != nullptr) << "inner counter " << name << " not found";
if (inner_counter == nullptr) [[unlikely]] {
return;
}
auto* counter = runtime_profile->get_counter(name);
if (counter == nullptr) [[unlikely]] {
counter = runtime_profile->add_counter(name, inner_counter->type(), "",
inner_counter->level());
}
if constexpr (accumulating) {
// Memory usage should not be accumulated.
if (counter->type() == TUnit::BYTES) {
counter->set(inner_counter->value());
} else {
counter->update(inner_counter->value());
}
} else {
counter->set(inner_counter->value());
}
}
#include "common/compile_check_end.h"
} // namespace doris