blob: 6a563cc9a68bd33bbd86401276acf917b9dac25e [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gen_cpp/BackendService_types.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <memory>
#include <queue>
#include <shared_mutex>
#include <string>
#include <unordered_set>
#include "common/factory_creator.h"
#include "common/status.h"
#include "service/backend_options.h"
#include "util/hash_util.hpp"
namespace doris {
class MemTrackerLimiter;
class RuntimeProfile;
class ThreadPool;
class ExecEnv;
class CgroupCpuCtl;
class QueryContext;
class IOThrottle;
class ResourceContext;
namespace vectorized {
class ScannerScheduler;
}
namespace pipeline {
class TaskScheduler;
} // namespace pipeline
class WorkloadGroup;
struct WorkloadGroupInfo;
struct TrackerLimiterGroup;
class WorkloadGroupMetrics;
class WorkloadGroup : public std::enable_shared_from_this<WorkloadGroup> {
ENABLE_FACTORY_CREATOR(WorkloadGroup);
public:
WorkloadGroup(const WorkloadGroupInfo& wg_info);
virtual ~WorkloadGroup();
int64_t version() const { return _version; }
uint64_t id() const { return _id; }
std::string name() const { return _name; };
int64_t memory_limit() const {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _memory_limit;
}
int64_t total_mem_used() const { return _total_mem_used; }
// make memory snapshots and refresh total memory used at the same time.
int64_t refresh_memory_usage();
int64_t memory_used();
void do_sweep();
#ifdef BE_TEST
void clear_cancelled_resource_ctx();
#endif
int memory_low_watermark() const {
return _memory_low_watermark.load(std::memory_order_relaxed);
}
int memory_high_watermark() const {
return _memory_high_watermark.load(std::memory_order_relaxed);
}
int total_query_slot_count() const {
return _total_query_slot_count.load(std::memory_order_relaxed);
}
void add_wg_refresh_interval_memory_growth(int64_t size) {
_wg_refresh_interval_memory_growth.fetch_add(size);
}
bool try_add_wg_refresh_interval_memory_growth(int64_t size);
void sub_wg_refresh_interval_memory_growth(int64_t size) {
_wg_refresh_interval_memory_growth.fetch_sub(size);
}
int64_t wg_refresh_interval_memory_growth() {
return _wg_refresh_interval_memory_growth.load();
}
void check_mem_used(bool* is_low_watermark, bool* is_high_watermark) const {
auto realtime_total_mem_used = _total_mem_used + _wg_refresh_interval_memory_growth.load();
*is_low_watermark = (realtime_total_mem_used >
((double)_memory_limit *
_memory_low_watermark.load(std::memory_order_relaxed) / 100));
*is_high_watermark = (realtime_total_mem_used >
((double)_memory_limit *
_memory_high_watermark.load(std::memory_order_relaxed) / 100));
}
std::string debug_string() const;
std::string memory_debug_string() const;
void check_and_update(const WorkloadGroupInfo& tg_info);
TWgSlotMemoryPolicy::type slot_memory_policy() const { return _slot_mem_policy; }
bool exceed_limit() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _memory_limit > 0 ? _total_mem_used > _memory_limit : false;
}
int64_t min_memory_limit() const { return _min_memory_limit; }
Status add_resource_ctx(TUniqueId query_id, std::shared_ptr<ResourceContext> resource_ctx) {
std::unique_lock<std::shared_mutex> wlock(_mutex);
if (_is_shutdown) {
// If the workload group is set shutdown, then should not run any more,
// because the scheduler pool and other pointer may be released.
return Status::InternalError(
"Failed add task to wg {}, the workload group is shutdown. host: {}", _id,
BackendOptions::get_localhost());
}
_resource_ctxs.insert({query_id, resource_ctx});
return Status::OK();
}
void shutdown() {
std::unique_lock<std::shared_mutex> wlock(_mutex);
_is_shutdown = true;
}
bool can_be_dropped() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _is_shutdown && _resource_ctxs.empty();
}
std::unordered_map<TUniqueId, std::weak_ptr<ResourceContext>> resource_ctxs() {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _resource_ctxs;
}
Status upsert_task_scheduler(WorkloadGroupInfo* tg_info);
virtual void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
vectorized::ScannerScheduler** scan_sched,
vectorized::ScannerScheduler** remote_scan_sched);
void try_stop_schedulers();
std::string thread_debug_info();
std::shared_ptr<IOThrottle> get_local_scan_io_throttle(const std::string& disk_dir);
std::shared_ptr<IOThrottle> get_remote_scan_io_throttle();
void upsert_scan_io_throttle(WorkloadGroupInfo* tg_info);
void update_cpu_time(int64_t delta_cpu_time);
void update_local_scan_io(std::string path, size_t scan_bytes);
void update_remote_scan_io(size_t scan_bytes);
int64_t get_mem_used();
virtual ThreadPool* get_memtable_flush_pool() {
// no lock here because this is called by memtable flush,
// to avoid lock competition with the workload thread pool's update
return _memtable_flush_pool.get();
}
void create_cgroup_cpu_ctl();
std::weak_ptr<CgroupCpuCtl> get_cgroup_cpu_ctl_wptr();
std::shared_ptr<WorkloadGroupMetrics> get_metrics() { return _wg_metrics; }
friend class WorkloadGroupMetrics;
friend class WorkloadGroupMgr;
int64_t revoke_memory(int64_t need_free_mem, const std::string& revoke_reason,
RuntimeProfile* profile);
private:
void set_id(uint64_t wg_id) { _id = wg_id; }
void create_cgroup_cpu_ctl_no_lock();
void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
Status upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
std::shared_ptr<CgroupCpuCtl> cg_cpu_ctl_ptr);
std::string _memory_debug_string() const;
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit
uint64_t _id;
std::string _name;
int64_t _version;
std::atomic<int> _min_cpu_percent = 0;
std::atomic<int> _max_cpu_percent = 100;
std::atomic<int64_t> _memory_limit = 1 << 30; // Default to 1GB
std::atomic<int64_t> _min_memory_limit = 1 << 26; // Default to 64MB
std::atomic<int> _min_memory_percent = 0;
std::atomic<int> _max_memory_percent = 100;
std::atomic<int> _memory_low_watermark;
std::atomic<int> _memory_high_watermark;
std::atomic<int> _scan_thread_num;
std::atomic<int> _max_remote_scan_thread_num;
std::atomic<int> _min_remote_scan_thread_num;
std::atomic<int64_t> _scan_bytes_per_second {-1};
std::atomic<int64_t> _remote_scan_bytes_per_second {-1};
std::atomic<int> _total_query_slot_count = 0;
std::atomic<TWgSlotMemoryPolicy::type> _slot_mem_policy {TWgSlotMemoryPolicy::NONE};
std::atomic_int64_t _total_mem_used = 0; // bytes
std::atomic_int64_t _wg_refresh_interval_memory_growth;
// means workload group is mark dropped
// new query can not submit
// waiting running query to be cancelled or finish
bool _is_shutdown = false;
std::unordered_map<TUniqueId, std::weak_ptr<ResourceContext>> _resource_ctxs;
std::shared_mutex _task_sched_lock;
// _cgroup_cpu_ctl not only used by threadpool which managed by WorkloadGroup,
// but also some global background threadpool which not owned by WorkloadGroup,
// so it should be shared ptr;
std::shared_ptr<CgroupCpuCtl> _cgroup_cpu_ctl {nullptr};
std::unique_ptr<doris::pipeline::TaskScheduler> _task_sched {nullptr};
std::unique_ptr<vectorized::ScannerScheduler> _scan_task_sched {nullptr};
std::unique_ptr<vectorized::ScannerScheduler> _remote_scan_task_sched {nullptr};
std::unique_ptr<ThreadPool> _memtable_flush_pool {nullptr};
std::map<std::string, std::shared_ptr<IOThrottle>> _scan_io_throttle_map;
std::shared_ptr<IOThrottle> _remote_scan_io_throttle {nullptr};
std::shared_ptr<WorkloadGroupMetrics> _wg_metrics {nullptr};
};
using WorkloadGroupPtr = std::shared_ptr<WorkloadGroup>;
struct WorkloadGroupInfo {
const uint64_t id = 0;
const std::string name = "";
const int64_t version = 0;
const int min_cpu_percent = 0;
const int max_cpu_percent = 100;
const int64_t memory_limit = 1 << 30; // Default to 1GB
const int min_memory_percent = 0;
const int max_memory_percent = 100;
const int memory_low_watermark = 0;
const int memory_high_watermark = 0;
const int scan_thread_num = 0;
const int max_remote_scan_thread_num = 0;
const int min_remote_scan_thread_num = 0;
const int64_t read_bytes_per_second = -1;
const int64_t remote_read_bytes_per_second = -1;
const int total_query_slot_count = 0;
const TWgSlotMemoryPolicy::type slot_mem_policy = TWgSlotMemoryPolicy::NONE;
// log cgroup cpu info
uint64_t cgroup_cpu_shares = 0;
int cgroup_cpu_hard_limit = 0;
const bool valid = true;
const int pipeline_exec_thread_num = 0;
const int blocking_pipeline_exec_thread_num = 0;
const int max_flush_thread_num = 0;
const int min_flush_thread_num = 0;
static WorkloadGroupInfo parse_topic_info(const TWorkloadGroupInfo& tworkload_group_info);
};
} // namespace doris