blob: d693cbe2168e6640b425ca694418bd2192841d49 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <glog/logging.h>
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
#include <set>
#include "common/status.h"
#include "pipeline_task.h"
#include "runtime/task_group/task_group.h"
namespace doris {
namespace pipeline {
class TaskQueue {
public:
TaskQueue(size_t core_size) : _core_size(core_size) {}
virtual ~TaskQueue();
virtual void close() = 0;
// Get the task by core id.
// TODO: To think the logic is useful?
virtual PipelineTask* take(size_t core_id) = 0;
// push from scheduler
virtual Status push_back(PipelineTask* task) = 0;
// push from worker
virtual Status push_back(PipelineTask* task, size_t core_id) = 0;
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TGPTEntityPtr entity) = 0;
int cores() const { return _core_size; }
protected:
size_t _core_size;
static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
};
class SubTaskQueue {
friend class PriorityTaskQueue;
public:
void push_back(PipelineTask* task) { _queue.emplace(task); }
PipelineTask* try_take(bool is_steal);
void set_level_factor(double level_factor) { _level_factor = level_factor; }
// note:
// runtime is the time consumed by the actual execution of the task
// vruntime(means virtual runtime) = runtime / _level_factor
double get_vruntime() { return _runtime / _level_factor; }
void inc_runtime(uint64_t delta_time) { _runtime += delta_time; }
void adjust_runtime(uint64_t vruntime) { this->_runtime = vruntime * _level_factor; }
bool empty() { return _queue.empty(); }
private:
std::queue<PipelineTask*> _queue;
// depends on LEVEL_QUEUE_TIME_FACTOR
double _level_factor = 1;
std::atomic<uint64_t> _runtime = 0;
};
// A Multilevel Feedback Queue
class PriorityTaskQueue {
public:
PriorityTaskQueue();
void close();
PipelineTask* try_take(bool is_steal);
PipelineTask* take(uint32_t timeout_ms = 0);
Status push(PipelineTask* task);
void inc_sub_queue_runtime(int level, uint64_t runtime) {
_sub_queues[level].inc_runtime(runtime);
}
int task_size();
private:
PipelineTask* _try_take_unprotected(bool is_steal);
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
static constexpr size_t SUB_QUEUE_LEVEL = 6;
SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
// 1s, 3s, 10s, 60s, 300s
uint64_t _queue_level_limit[SUB_QUEUE_LEVEL - 1] = {1000000000, 3000000000, 10000000000,
60000000000, 300000000000};
std::mutex _work_size_mutex;
std::condition_variable _wait_task;
std::atomic<size_t> _total_task_size = 0;
bool _closed;
// used to adjust vruntime of a queue when it's not empty
// protected by lock _work_size_mutex
uint64_t _queue_level_min_vruntime = 0;
int _compute_level(uint64_t real_runtime);
};
// Need consider NUMA architecture
class MultiCoreTaskQueue : public TaskQueue {
public:
explicit MultiCoreTaskQueue(size_t core_size);
~MultiCoreTaskQueue() override;
void close() override;
// Get the task by core id.
// TODO: To think the logic is useful?
PipelineTask* take(size_t core_id) override;
// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(PipelineTask* task) override;
Status push_back(PipelineTask* task, size_t core_id) override;
void update_statistics(PipelineTask* task, int64_t time_spent) override {
task->inc_runtime_ns(time_spent);
_prio_task_queue_list[task->get_core_id()].inc_sub_queue_runtime(task->get_queue_level(),
time_spent);
}
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TGPTEntityPtr entity) override {
LOG(FATAL) << "update_tg_cpu_share not implemented";
}
private:
PipelineTask* _steal_take(size_t core_id);
std::unique_ptr<PriorityTaskQueue[]> _prio_task_queue_list;
std::atomic<size_t> _next_core = 0;
std::atomic<bool> _closed;
};
class TaskGroupTaskQueue : public TaskQueue {
public:
explicit TaskGroupTaskQueue(size_t);
~TaskGroupTaskQueue() override;
void close() override;
PipelineTask* take(size_t core_id) override;
// from TaskScheduler or BlockedTaskScheduler
Status push_back(PipelineTask* task) override;
// from worker
Status push_back(PipelineTask* task, size_t core_id) override;
void update_statistics(PipelineTask* task, int64_t time_spent) override;
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
taskgroup::TGPTEntityPtr entity) override;
private:
template <bool from_executor>
Status _push_back(PipelineTask* task);
template <bool from_worker>
void _enqueue_task_group(taskgroup::TGPTEntityPtr);
void _dequeue_task_group(taskgroup::TGPTEntityPtr);
taskgroup::TGPTEntityPtr _next_tg_entity();
uint64_t _ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const;
void _update_min_tg();
// Like cfs rb tree in sched_entity
struct TaskGroupSchedEntityComparator {
bool operator()(const taskgroup::TGPTEntityPtr&, const taskgroup::TGPTEntityPtr&) const;
};
using ResouceGroupSet = std::set<taskgroup::TGPTEntityPtr, TaskGroupSchedEntityComparator>;
ResouceGroupSet _group_entities;
std::condition_variable _wait_task;
std::mutex _rs_mutex;
bool _closed = false;
int _total_cpu_share = 0;
std::atomic<taskgroup::TGPTEntityPtr> _min_tg_entity = nullptr;
uint64_t _min_tg_v_runtime_ns = 0;
};
} // namespace pipeline
} // namespace doris