[feature](executor) Add time sharing executor framework.
diff --git a/be/src/common/status.h b/be/src/common/status.h index 841d509..0a6c35e 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h
@@ -737,6 +737,16 @@ std::forward<T>(res).value(); \ }); +#define TEST_TRY(stmt) \ + ({ \ + auto&& res = (stmt); \ + using T = std::decay_t<decltype(res)>; \ + if (!res.has_value()) [[unlikely]] { \ + ASSERT_TRUE(res.has_value()) << "Expected success, but got error: " << res.error(); \ + } \ + std::forward<T>(res).value(); \ + }) + } // namespace doris // specify formatter for Status
diff --git a/be/src/vec/exec/executor/batch_split_scheduler.h b/be/src/vec/exec/executor/batch_split_scheduler.h new file mode 100644 index 0000000..c614428 --- /dev/null +++ b/be/src/vec/exec/executor/batch_split_scheduler.h
@@ -0,0 +1,142 @@ +//#pragma once +// +//#include <atomic> +//#include <memory> +//#include <mutex> +//#include <condition_variable> +//#include <thread> +//#include <queue> +// +//#include "common/status.h" +//#include "vec/exec/scan/split_source_connector.h" +//#include "vec/exec/scan/vfile_scanner.h" +//#include "runtime/runtime_state.h" +// +//namespace doris::vectorized { +// +//// BatchSplitScheduler 负责在独立线程中获取文件分片并动态创建 VFileScanner +//class BatchSplitScheduler { +//public: +// BatchSplitScheduler(std::shared_ptr<SplitSourceConnector> split_source, +// RuntimeState* state, +// const TFileScanNode& scan_node) +// : _split_source(split_source) +// , _state(state) +// , _scan_node(scan_node) +// , _running(false) +// , _stop(false) {} +// +// ~BatchSplitScheduler() { +// stop(); +// } +// +// Status start() { +// if (_running) { +// return Status::OK(); +// } +// _running = true; +// _stop = false; +// _fetch_thread = std::thread(&BatchSplitScheduler::_fetch_loop, this); +// return Status::OK(); +// } +// +// void stop() { +// if (!_running) { +// return; +// } +// { +// std::lock_guard<std::mutex> l(_lock); +// _stop = true; +// _cv.notify_one(); +// } +// if (_fetch_thread.joinable()) { +// _fetch_thread.join(); +// } +// _running = false; +// } +// +// // 获取一个可用的 scanner +// Status get_next_scanner(std::shared_ptr<VFileScanner>* scanner) { +// std::unique_lock<std::mutex> l(_lock); +// while (_scanners.empty() && !_stop && _running) { +// _cv.wait(l); +// } +// +// if (_stop || !_running) { +// return Status::Cancelled("BatchSplitScheduler stopped"); +// } +// +// if (!_scanners.empty()) { +// *scanner = _scanners.front(); +// _scanners.pop(); +// return Status::OK(); +// } +// +// return Status::EndOfFile("No more scanners"); +// } +// +// // 通知当前 scanner 已完成,可以获取新的分片 +// void notify_scanner_finished() { +// std::lock_guard<std::mutex> l(_lock); +// _scanner_finished = true; +// _cv.notify_one(); +// } +// +//private: +// void _fetch_loop() { +// while (true) { +// { +// std::unique_lock<std::mutex> l(_lock); +// _cv.wait(l, [this]() { +// return _stop || _scanner_finished || _scanners.empty(); +// }); +// if (_stop) { +// break; +// } +// _scanner_finished = false; +// } +// +// // 获取新的分片并创建 scanner +// bool has_next = false; +// TFileScanRange range; +// auto status = _split_source->get_next(&has_next, &range); +// if (!status.ok()) { +// LOG(WARNING) << "Failed to get next split: " << status; +// continue; +// } +// +// if (!has_next) { +// stop(); +// break; +// } +// +// // 创建新的 scanner +// auto scanner = std::make_shared<VFileScanner>(_state, _scan_node); +// status = scanner->init(_state, _scan_node, {range}, nullptr, nullptr); +// if (!status.ok()) { +// LOG(WARNING) << "Failed to init scanner: " << status; +// continue; +// } +// +// { +// std::lock_guard<std::mutex> l(_lock); +// _scanners.push(scanner); +// _cv.notify_one(); +// } +// } +// } +// +//private: +// std::shared_ptr<SplitSourceConnector> _split_source; +// RuntimeState* _state; +// TFileScanNode _scan_node; +// std::atomic<bool> _running; +// std::atomic<bool> _stop; +// std::atomic<bool> _scanner_finished{true}; // 初始为 true 以获取第一批分片 +// std::mutex _lock; +// std::condition_variable _cv; +// std::thread _fetch_thread; +// std::queue<std::shared_ptr<VFileScanner>> _scanners; +//}; +// +//} // namespace doris::vectorized
diff --git a/be/src/vec/exec/executor/listenable_future.h b/be/src/vec/exec/executor/listenable_future.h new file mode 100644 index 0000000..420dc31 --- /dev/null +++ b/be/src/vec/exec/executor/listenable_future.h
@@ -0,0 +1,250 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <condition_variable> +#include <exception> +#include <functional> +#include <iostream> +#include <memory> +#include <mutex> +#include <vector> + +#include "common/status.h" +#include "glog/logging.h" + +namespace doris { +namespace vectorized { + +struct Void { + bool operator==(const Void&) const { return true; } + bool operator!=(const Void&) const { return false; } +}; + +template <typename T> +class SharedListenableFuture; + +template <typename T> +class ListenableFuture { +public: + using Callback = std::function<void(const T&, const doris::Status&)>; + + ListenableFuture(const ListenableFuture&) = delete; + ListenableFuture& operator=(const ListenableFuture&) = delete; + + ListenableFuture(ListenableFuture&& other) noexcept + : _ready(other._ready), + _value(std::move(other._value)), + _status(std::move(other._status)), + _callbacks(std::move(other._callbacks)) { + other._ready = false; + } + + ListenableFuture& operator=(ListenableFuture&& other) noexcept { + if (this != &other) { + std::lock_guard<std::mutex> lock(mutex_); + std::lock_guard<std::mutex> other_lock(other.mutex_); + _ready = other._ready; + _value = std::move(other._value); + _status = std::move(other._status); + _callbacks = std::move(other._callbacks); + other._ready = false; + } + return *this; + } + + ListenableFuture() : _ready(false) {} + + void set_value(const T& value) { _execute(value, doris::Status::OK()); } + + void set_error(const doris::Status& status) { _execute({}, status); } + + void add_callback(Callback cb) { + std::lock_guard<std::mutex> lock(mutex_); + if (_ready) { + cb(_value, _status); + } else { + _callbacks.emplace_back(std::move(cb)); + } + } + + static ListenableFuture<T> create_ready(T value) { + ListenableFuture<T> future; + future.set_value(std::move(value)); + return future; + } + + template <typename U = T> + static typename std::enable_if<std::is_same<U, Void>::value, ListenableFuture<U>>::type + create_ready() { + ListenableFuture<U> future; + future.set_value(Void {}); + return future; + } + + bool is_ready() const { + std::lock_guard<std::mutex> lock(mutex_); + return _ready; + } + + bool is_done() const { + std::lock_guard<std::mutex> lock(mutex_); + return _ready && _status.ok(); + } + + bool is_error() const { + std::lock_guard<std::mutex> lock(mutex_); + return _ready && !_status.ok(); + } + + const doris::Status& get_status() const { + std::lock_guard<std::mutex> lock(mutex_); + return _status; + } + + Result<T> get() { + std::unique_lock<std::mutex> lock(mutex_); + while (!_ready) { + cv_.wait(lock); + } + if (!_status.ok()) { + return unexpected(_status); + } + return _value; + } + + SharedListenableFuture<T> share() && { return SharedListenableFuture<T>(std::move(*this)); } + + friend class SharedListenableFuture<T>; + +private: + void _execute(const T& value, const doris::Status& status) { + std::vector<Callback> tmp_callbacks; + { + std::lock_guard<std::mutex> lock(mutex_); + if (_ready) { + return; + } + + _value = value; + _status = status; + _ready = true; + tmp_callbacks.swap(_callbacks); + } + + for (auto& cb : tmp_callbacks) { + cb(_value, _status); + } + cv_.notify_all(); + } + + mutable std::mutex mutex_; + std::condition_variable cv_; + bool _ready; + T _value; + doris::Status _status = doris::Status::OK(); + std::vector<Callback> _callbacks; +}; + +template <typename T> +class SharedListenableFuture { +public: + using Callback = typename ListenableFuture<T>::Callback; + + SharedListenableFuture(const SharedListenableFuture&) = default; + SharedListenableFuture& operator=(const SharedListenableFuture&) = default; + + SharedListenableFuture(SharedListenableFuture&&) = default; + SharedListenableFuture& operator=(SharedListenableFuture&&) = default; + + explicit SharedListenableFuture(ListenableFuture<T>&& future) + : impl_(std::make_shared<ListenableFuture<T>>(std::move(future))) {} + + explicit SharedListenableFuture(std::shared_ptr<ListenableFuture<T>> future_ptr) + : impl_(std::move(future_ptr)) {} + + void add_callback(Callback cb) { return impl_->add_callback(std::move(cb)); } + + bool is_ready() const { return impl_->is_ready(); } + + bool is_done() const { return impl_->is_done(); } + + bool is_error() const { return impl_->is_error(); } + + const doris::Status& get_status() const { return impl_->get_status(); } + + Result<T> get() { return impl_->get(); } + + static SharedListenableFuture<T> create_ready(T value) { + return SharedListenableFuture<T>(ListenableFuture<T>::create_ready(std::move(value))); + } + + SharedListenableFuture() : impl_(std::make_shared<ListenableFuture<T>>()) {} + + void set_value(const T& value) { impl_->set_value(value); } + + void set_error(const doris::Status& status) { impl_->set_error(status); } + + template <typename U = T> + static typename std::enable_if<std::is_same<U, Void>::value, SharedListenableFuture<U>>::type + create_ready() { + return SharedListenableFuture<U>(ListenableFuture<U>::create_ready()); + } + +private: + std::shared_ptr<ListenableFuture<T>> impl_; +}; + +namespace listenable_future { +inline SharedListenableFuture<Void> null_future = + SharedListenableFuture<Void>::create_ready(Void {}); +} // namespace listenable_future + +} // namespace vectorized +} // namespace doris + +// 使用示例 +//int main() { +// ListenableFuture<int> future; +// +// // 添加回调 +// future.add_callback( +// [](int value) { +// printf("Got value: %d\n", value); +// }, +// [](auto e) { +// try { +// if (e) std::rethrow_exception(e); +// } catch (const std::exception& ex) { +// printf("Exception: %s\n", ex.what()); +// } +// } +// ); +// +// // 链式调用 +// auto next = future.then([](int x) { +// return x * 2; +// }).then([](int x) { +// return std::to_string(x); +// }); +// +// // 设置结果 +// future.set_value(42); +// +// return 0; +//} \ No newline at end of file
diff --git a/be/src/vec/exec/executor/split_runner.h b/be/src/vec/exec/executor/split_runner.h new file mode 100644 index 0000000..49d8dd1 --- /dev/null +++ b/be/src/vec/exec/executor/split_runner.h
@@ -0,0 +1,40 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <memory> +#include <string> + +#include "common/status.h" +#include "vec/exec/executor/listenable_future.h" + +namespace doris { +namespace vectorized { + +class SplitRunner { +public: + virtual ~SplitRunner() = default; + virtual Status init() = 0; + virtual Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds duration) = 0; + virtual void close(const Status& status) = 0; + virtual bool is_finished() = 0; + virtual Status finished_status() = 0; + virtual std::string get_info() const = 0; +}; + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/stats.h b/be/src/vec/exec/executor/stats.h new file mode 100644 index 0000000..c86961b --- /dev/null +++ b/be/src/vec/exec/executor/stats.h
@@ -0,0 +1,104 @@ +//// Licensed to the Apache Software Foundation (ASF) under one +//// or more contributor license agreements. See the NOTICE file +//// distributed with this work for additional information +//// regarding copyright ownership. The ASF licenses this file +//// to you under the Apache License, Version 2.0 (the +//// "License"); you may not use this file except in compliance +//// with the License. You may obtain a copy of the License at +//// +//// http://www.apache.org/licenses/LICENSE-2.0 +//// +//// Unless required by applicable law or agreed to in writing, +//// software distributed under the License is distributed on an +//// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//// KIND, either express or implied. See the License for the +//// specific language governing permissions and limitations +//// under the License. +// +//#pragma once +//#include <array> +//#include <atomic> +//#include <chrono> +//#include <cmath> +//#include <vector> +// +//namespace doris { +//namespace vectorized { +// +//class TimeStats { +//public: +// void add(std::chrono::nanoseconds duration) { +// totalTime.fetch_add(duration.count()); +// count.fetch_add(1); +// } +// +// double getAverage() const { +// auto total = totalTime.load(); +// auto cnt = count.load(); +// return cnt > 0 ? static_cast<double>(total) / cnt : 0.0; +// } +// +//private: +// std::atomic<int64_t> totalTime {0}; +// std::atomic<int64_t> count {0}; +//}; +// +//class TimeDistribution { +//public: +// void add(std::chrono::microseconds duration) { +// auto bucket = getBucket(duration.count()); +// buckets[bucket].fetch_add(1); +// count.fetch_add(1); +// } +// +//private: +// static constexpr size_t BUCKET_COUNT = 32; +// std::array<std::atomic<int64_t>, BUCKET_COUNT> buckets {}; +// std::atomic<int64_t> count {0}; +// +// static size_t getBucket(int64_t value) { +// return std::min(static_cast<size_t>(std::log2(static_cast<double>(value) + 1)), +// BUCKET_COUNT - 1); +// } +//}; +// +//class CounterStats { +//public: +// void add(int64_t value) { total.fetch_add(value); } +// +// int64_t getTotal() const { return total.load(); } +// +//private: +// std::atomic<int64_t> total {0}; +//}; +// +//class DistributionStats { +//public: +// void add(double value) { +// count.fetch_add(1); +// sum.fetch_add(value); +// updateMin(value); +// updateMax(value); +// } +// +//private: +// std::atomic<int64_t> count {0}; +// std::atomic<double> sum {0.0}; +// std::atomic<double> min {std::numeric_limits<double>::max()}; +// std::atomic<double> max {std::numeric_limits<double>::lowest()}; +// +// void updateMin(double value) { +// double current = min.load(); +// while (value < current && !min.compare_exchange_weak(current, value)) { +// } +// } +// +// void updateMax(double value) { +// double current = max.load(); +// while (value > current && !max.compare_exchange_weak(current, value)) { +// } +// } +//}; +// +//} // namespace vectorized +//} // namespace doris
diff --git a/be/src/vec/exec/executor/task_executor.h b/be/src/vec/exec/executor/task_executor.h new file mode 100644 index 0000000..e6bf89c --- /dev/null +++ b/be/src/vec/exec/executor/task_executor.h
@@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <future> +#include <memory> +#include <optional> +#include <vector> + +#include "vec/exec/executor/listenable_future.h" +#include "vec/exec/executor/split_runner.h" +#include "vec/exec/executor/task_id.h" + +namespace doris { +namespace vectorized { + +class TaskHandle; + +class TaskExecutor { +public: + virtual ~TaskExecutor() = default; + + virtual Status init() = 0; + virtual Status start() = 0; + virtual void stop() = 0; + + virtual Result<std::shared_ptr<TaskHandle>> create_task( + const TaskId& task_id, std::function<double()> utilization_supplier, + int initial_split_concurrency, + std::chrono::nanoseconds split_concurrency_adjust_frequency, + std::optional<int> max_drivers_per_task) = 0; + + virtual Status add_task(const TaskId& task_id, std::shared_ptr<TaskHandle> task_handle) = 0; + + virtual Status remove_task(std::shared_ptr<TaskHandle> task_handle) = 0; + + virtual Result<std::vector<SharedListenableFuture<Void>>> enqueue_splits( + std::shared_ptr<TaskHandle> task_handle, bool intermediate, + const std::vector<std::shared_ptr<SplitRunner>>& splits) = 0; +}; + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/task_handle.h b/be/src/vec/exec/executor/task_handle.h new file mode 100644 index 0000000..0193ab4 --- /dev/null +++ b/be/src/vec/exec/executor/task_handle.h
@@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/status.h" +#include "vec/exec/executor/task_id.h" + +namespace doris { +namespace vectorized { + +class TaskHandle { +public: + virtual ~TaskHandle() = default; + virtual Status init() = 0; + virtual bool is_closed() const = 0; + virtual TaskId task_id() const = 0; +}; + +} // namespace vectorized +} // namespace doris \ No newline at end of file
diff --git a/be/src/vec/exec/executor/task_id.h b/be/src/vec/exec/executor/task_id.h new file mode 100644 index 0000000..347bafe --- /dev/null +++ b/be/src/vec/exec/executor/task_id.h
@@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <memory> +#include <string> + +namespace doris { +namespace vectorized { + +class TaskId { +public: + TaskId(const std::string& id) : _id(id) {} + std::string to_string() const { return _id; } + bool operator==(const TaskId& other) const { return _id == other._id; } + +private: + std::string _id; +}; + +} // namespace vectorized +} // namespace doris + +namespace std { +template <> +struct hash<doris::vectorized::TaskId> { + size_t operator()(const doris::vectorized::TaskId& task_id) const { + return std::hash<std::string> {}(task_id.to_string()); + } +}; +} // namespace std
diff --git a/be/src/vec/exec/executor/ticker.h b/be/src/vec/exec/executor/ticker.h new file mode 100644 index 0000000..5b36e26 --- /dev/null +++ b/be/src/vec/exec/executor/ticker.h
@@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <atomic> +#include <chrono> +#include <mutex> + +namespace doris { +namespace vectorized { + +class Ticker { +public: + virtual ~Ticker() = default; + + /** + * Returns the number of nanoseconds since a fixed reference point + */ + virtual int64_t read() const = 0; + +protected: + Ticker() = default; +}; + +class SystemTicker : public Ticker { +public: + int64_t read() const override { + return std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); + } +}; + +} // namespace vectorized +} // namespace doris \ No newline at end of file
diff --git a/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.cpp b/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.cpp new file mode 100644 index 0000000..9434b6c --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.cpp
@@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/executor/time_sharing/multilevel_split_queue.h" + +#include <algorithm> +#include <cmath> +#include <numeric> +#include <unordered_set> + +namespace doris { +namespace vectorized { + +MultilevelSplitQueue::MultilevelSplitQueue(double level_time_multiplier) + : _level_time_multiplier(level_time_multiplier), _level_waiting_splits() { + for (size_t i = 0; i < LEVEL_THRESHOLD_SECONDS.size(); ++i) { + _level_scheduled_time[i].store(0); + _level_min_priority[i].store(-1); + } +} + +int MultilevelSplitQueue::compute_level(int64_t scheduled_nanos) { + double scheduled_seconds = static_cast<double>(scheduled_nanos) / 1000000000.0; + + for (int level = 0; level < (LEVEL_THRESHOLD_SECONDS.size() - 1); level++) { + if (scheduled_seconds < LEVEL_THRESHOLD_SECONDS[level + 1]) { + return level; + } + } + + return LEVEL_THRESHOLD_SECONDS.size() - 1; +} + +/** + * 'Charges' the quanta run time to the task <i>and</i> the level it belongs to in + * an effort to maintain the target thread utilization ratios between levels and to + * maintain fairness within a level. + * <p> + * Consider an example split where a read hung for several minutes. This is either a bug + * or a failing dependency. In either case we do not want to charge the task too much, + * and we especially do not want to charge the level too much - i.e. cause other queries + * in this level to starve. + * + * @return the new priority for the task + */ +Priority MultilevelSplitQueue::update_priority(const Priority& old_priority, int64_t quanta_nanos, + int64_t scheduled_nanos) { + int old_level = old_priority.level(); + int new_level = compute_level(scheduled_nanos); + + int64_t level_contribution = std::min(quanta_nanos, LEVEL_CONTRIBUTION_CAP); + + if (old_level == new_level) { + _level_scheduled_time[old_level].fetch_add(level_contribution); + return Priority(old_level, old_priority.level_priority() + quanta_nanos); + } + + int64_t remaining_level_contribution = level_contribution; + int64_t remaining_task_time = quanta_nanos; + + for (int current_level = old_level; current_level < new_level; ++current_level) { + int64_t level_time_threshold = + static_cast<int64_t>((LEVEL_THRESHOLD_SECONDS[current_level + 1] - + LEVEL_THRESHOLD_SECONDS[current_level]) * + 1e9); + + int64_t time_accrued_to_level = + std::min(level_time_threshold, remaining_level_contribution); + _level_scheduled_time[current_level].fetch_add(time_accrued_to_level); + remaining_level_contribution -= time_accrued_to_level; + remaining_task_time -= time_accrued_to_level; + } + + _level_scheduled_time[new_level].fetch_add(remaining_level_contribution); + int64_t new_level_min_priority = get_level_min_priority(new_level, scheduled_nanos); + return Priority(new_level, new_level_min_priority + remaining_task_time); +} + +int64_t MultilevelSplitQueue::get_level_min_priority(int level, int64_t task_thread_usage_nanos) { + int64_t expected = -1; + _level_min_priority[level].compare_exchange_strong(expected, task_thread_usage_nanos); + return _level_min_priority[level].load(); +} + +void MultilevelSplitQueue::offer(std::shared_ptr<PrioritizedSplitRunner> split) { + split->set_ready(); + int level = split->priority().level(); + std::unique_lock<std::mutex> lock(_mutex); + _offer_locked(split, level, lock); +} + +void MultilevelSplitQueue::_offer_locked(std::shared_ptr<PrioritizedSplitRunner> split, int level, + std::unique_lock<std::mutex>& lock) { + if (_level_waiting_splits[level].empty()) { + // Accesses to _level_scheduled_time are not synchronized, so we have a data race + // here - our level time math will be off. However, the staleness is bounded by + // the fact that only running splits that complete during this computation + // can update the level time. Therefore, this is benign. + int64_t level0_time = _get_level0_target_time(lock); + int64_t level_expected_time = + static_cast<int64_t>(level0_time / std::pow(_level_time_multiplier, level)); + int64_t delta = level_expected_time - _level_scheduled_time[level].load(); + _level_scheduled_time[level].fetch_add(delta); + } + + _level_waiting_splits[level].push(split); + _not_empty.notify_all(); +} + +std::shared_ptr<PrioritizedSplitRunner> MultilevelSplitQueue::take() { + std::unique_lock<std::mutex> lock(_mutex); + + while (!_interrupted) { + auto split = _poll_split(lock); + if (split) { + if (split->update_level_priority()) { + _offer_locked(split, split->priority().level(), lock); + continue; + } + int selected_level = split->priority().level(); + _level_min_priority[selected_level].store(split->priority().level_priority()); + return split; + } + + _not_empty.wait(lock); + } + return nullptr; +} + +/** + * Attempts to give each level a target amount of scheduled time, which is configurable + * using levelTimeMultiplier. + * <p> + * This function selects the level that has the lowest ratio of actual to the target time + * with the objective of minimizing deviation from the target scheduled time. From this level, + * we pick the split with the lowest priority. + */ +std::shared_ptr<PrioritizedSplitRunner> MultilevelSplitQueue::_poll_split( + std::unique_lock<std::mutex>& lock) { + int64_t target_scheduled_time = _get_level0_target_time(lock); + double worst_ratio = 1.0; + int selected_level = -1; + + for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.size(); ++level) { + if (!_level_waiting_splits[level].empty()) { + int64_t level_time = _level_scheduled_time[level].load(); + double ratio = (level_time == 0) ? 0 + : static_cast<double>(target_scheduled_time) / + (1.0 * level_time); + + if (selected_level == -1 || ratio > worst_ratio) { + worst_ratio = ratio; + selected_level = level; + } + } + target_scheduled_time = + static_cast<int64_t>(std::round(target_scheduled_time / _level_time_multiplier)); + } + + if (selected_level == -1) return nullptr; + + auto result = _level_waiting_splits[selected_level].top(); + _level_waiting_splits[selected_level].pop(); + return result; +} + +void MultilevelSplitQueue::remove(std::shared_ptr<PrioritizedSplitRunner> split) { + std::lock_guard<std::mutex> lock(_mutex); + + for (auto& level_queue : _level_waiting_splits) { + std::priority_queue<std::shared_ptr<PrioritizedSplitRunner>, + std::vector<std::shared_ptr<PrioritizedSplitRunner>>, + SplitRunnerComparator> + new_queue; + + while (!level_queue.empty()) { + auto current = level_queue.top(); + level_queue.pop(); + if (current != split) { + new_queue.emplace(std::move(current)); + } + } + level_queue.swap(new_queue); + } + + if (std::all_of(_level_waiting_splits.begin(), _level_waiting_splits.end(), + [](const auto& q) { return q.empty(); })) { + _not_empty.notify_all(); + } +} + +void MultilevelSplitQueue::remove_all( + const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits) { + std::lock_guard<std::mutex> lock(_mutex); + + std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>> to_remove(splits.begin(), + splits.end()); + + for (auto& level_queue : _level_waiting_splits) { + std::priority_queue<std::shared_ptr<PrioritizedSplitRunner>, + std::vector<std::shared_ptr<PrioritizedSplitRunner>>, + SplitRunnerComparator> + new_queue; + + while (!level_queue.empty()) { + auto current = level_queue.top(); + level_queue.pop(); + if (!to_remove.count(current)) { + new_queue.emplace(std::move(current)); + } + } + level_queue.swap(new_queue); + } + + if (std::all_of(_level_waiting_splits.begin(), _level_waiting_splits.end(), + [](const auto& q) { return q.empty(); })) { + _not_empty.notify_all(); + } +} + +size_t MultilevelSplitQueue::size() const { + std::lock_guard<std::mutex> lock(_mutex); + return std::accumulate(_level_waiting_splits.begin(), _level_waiting_splits.end(), size_t(0), + [](size_t sum, const auto& queue) { return sum + queue.size(); }); +} + +void MultilevelSplitQueue::interrupt() { + std::lock_guard<std::mutex> lock(_mutex); + _interrupted = true; + _not_empty.notify_all(); +} + +int64_t MultilevelSplitQueue::_get_level0_target_time(std::unique_lock<std::mutex>& lock) { + int64_t level0_target_time = _level_scheduled_time[0].load(); + double current_multiplier = _level_time_multiplier; + + for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.size(); ++level) { + current_multiplier /= _level_time_multiplier; + int64_t level_time = _level_scheduled_time[level].load(); + level0_target_time = + std::max(level0_target_time, static_cast<int64_t>(level_time / current_multiplier)); + } + return level0_target_time; +} + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.h b/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.h new file mode 100644 index 0000000..50eb6a3 --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.h
@@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <array> +#include <condition_variable> +#include <memory> +#include <mutex> +#include <queue> + +#include "common/factory_creator.h" +#include "vec/exec/executor/time_sharing/prioritized_split_runner.h" +#include "vec/exec/executor/time_sharing/priority.h" + +namespace doris { +namespace vectorized { + +struct SplitRunnerComparator { + bool operator()(const std::shared_ptr<PrioritizedSplitRunner>& a, + const std::shared_ptr<PrioritizedSplitRunner>& b) const { + const auto a_priority = a->priority().level_priority(); + const auto b_priority = b->priority().level_priority(); + if (a_priority != b_priority) { + return a_priority > b_priority; + } + + return a->worker_id() > b->worker_id(); + } +}; + +class MultilevelSplitQueue { + ENABLE_FACTORY_CREATOR(MultilevelSplitQueue); + +public: + static constexpr std::array<int, 5> LEVEL_THRESHOLD_SECONDS = {0, 1, 10, 60, 300}; + static constexpr int64_t LEVEL_CONTRIBUTION_CAP = 30 * 1000000000LL; // 30 seconds in nanos + + explicit MultilevelSplitQueue(double level_time_multiplier); + + static int compute_level(int64_t scheduled_nanos); + + Priority update_priority(const Priority& old_priority, int64_t quanta_nanos, + int64_t scheduled_nanos); + int64_t get_level_min_priority(int level, int64_t scheduled_nanos); + void offer(std::shared_ptr<PrioritizedSplitRunner> split); + std::shared_ptr<PrioritizedSplitRunner> take(); + size_t size() const; + void remove(std::shared_ptr<PrioritizedSplitRunner> split); + void remove_all(const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits); + void interrupt(); + + int64_t level_scheduled_time(int level) const { return _level_scheduled_time[level].load(); } + +private: + int64_t _get_level0_target_time(std::unique_lock<std::mutex>& lock); + std::shared_ptr<PrioritizedSplitRunner> _poll_split(std::unique_lock<std::mutex>& lock); + void _offer_locked(std::shared_ptr<PrioritizedSplitRunner> split, int level, + std::unique_lock<std::mutex>& lock); + + const double _level_time_multiplier; + + std::array<std::priority_queue<std::shared_ptr<PrioritizedSplitRunner>, + std::vector<std::shared_ptr<PrioritizedSplitRunner>>, + SplitRunnerComparator>, + LEVEL_THRESHOLD_SECONDS.size()> + _level_waiting_splits; + + std::array<std::atomic<int64_t>, LEVEL_THRESHOLD_SECONDS.size()> _level_scheduled_time; + std::array<std::atomic<int64_t>, LEVEL_THRESHOLD_SECONDS.size()> _level_min_priority; + + std::atomic<bool> _interrupted {false}; + mutable std::mutex _mutex; + std::condition_variable _not_empty; + + // std::array<std::shared_ptr<CounterStats>, LEVEL_THRESHOLD_SECONDS.size()> + // _selected_level_counters; +}; + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.cpp b/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.cpp new file mode 100644 index 0000000..a9e97ec --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.cpp
@@ -0,0 +1,207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/executor/time_sharing/prioritized_split_runner.h" + +#include <fmt/format.h> + +#include <chrono> +#include <functional> +#include <thread> + +#include "vec/exec/executor/time_sharing/time_sharing_task_handle.h" + +namespace doris { +namespace vectorized { + +std::atomic<int64_t> PrioritizedSplitRunner::_next_worker_id(0); + +PrioritizedSplitRunner::PrioritizedSplitRunner(std::shared_ptr<TimeSharingTaskHandle> task_handle, + int split_id, + std::shared_ptr<SplitRunner> split_runner, + std::shared_ptr<Ticker> ticker) + : _task_handle(std::move(task_handle)), + _split_id(split_id), + _worker_id(_next_worker_id.fetch_add(1, std::memory_order_relaxed)), + _split_runner(std::move(split_runner)), + _ticker(ticker) { + update_level_priority(); +} + +//PrioritizedSplitRunner::PrioritizedSplitRunner(CounterStats& globalCpuTimeMicros, +// CounterStats& globalScheduledTimeMicros, +// TimeStats& blockedQuantaWallTime, +// TimeStats& unblockedQuantaWallTime) +// : _created_nanos(std::chrono::steady_clock::now().time_since_epoch().count()), +// _split_id(0), +// _worker_id(_next_worker_id.fetch_add(1, std::memory_order_relaxed)), +// _ticker(std::make_shared<SystemTicker>()), +// _global_cpu_time_micros(globalCpuTimeMicros), +// _global_scheduled_time_micros(globalScheduledTimeMicros), +// _blocked_quanta_wall_time(blockedQuantaWallTime), +// _unblocked_quanta_wall_time(unblockedQuantaWallTime) { +//} + +Status PrioritizedSplitRunner::init() { + return _split_runner->init(); +} + +std::shared_ptr<TimeSharingTaskHandle> PrioritizedSplitRunner::task_handle() const { + return _task_handle; +} + +SharedListenableFuture<Void> PrioritizedSplitRunner::finished_future() { + return _finished_future; +} + +bool PrioritizedSplitRunner::is_closed() const { + return _closed.load(); +} + +void PrioritizedSplitRunner::close(const Status& status) { + if (!_closed.exchange(true)) { + _split_runner->close(status); + } +} + +int64_t PrioritizedSplitRunner::created_nanos() const { + return _created_nanos; +} + +bool PrioritizedSplitRunner::is_finished() { + bool finished = _split_runner->is_finished(); + if (finished) { + _finished_future.set_value({}); + } + return finished || _closed.load() || _task_handle->is_closed(); +} + +Status PrioritizedSplitRunner::finished_status() { + return _split_runner->finished_status(); +} + +int64_t PrioritizedSplitRunner::scheduled_nanos() const { + return _scheduled_nanos.load(); +} + +Result<SharedListenableFuture<Void>> PrioritizedSplitRunner::process() { + if (is_closed()) { + SharedListenableFuture<Void> future; + future.set_value({}); + return future; + } + + auto start_nanos = std::chrono::steady_clock::now().time_since_epoch().count(); + int64_t expected = 0; + _start.compare_exchange_strong(expected, start_nanos); + _last_ready.compare_exchange_strong(expected, start_nanos); + _process_calls.fetch_add(1); + + _wait_nanos.fetch_add(start_nanos - _last_ready.load()); + + auto process_start_time = _ticker->read(); + auto blocked = _split_runner->process_for(SPLIT_RUN_QUANTA); + if (!blocked.has_value()) { + return unexpected(std::move(blocked).error()); + } + auto process_end_time = _ticker->read(); + auto quanta_scheduled_nanos = process_end_time - process_start_time; + + _scheduled_nanos.fetch_add(quanta_scheduled_nanos); + + // _priority.store(_task_handle->add_scheduled_nanos(quanta_scheduled_nanos)); + + { + std::lock_guard<std::mutex> lock(_priority_mutex); + _priority = _task_handle->add_scheduled_nanos(quanta_scheduled_nanos); + } + + return blocked; +} + +void PrioritizedSplitRunner::set_ready() { + _last_ready.store(_ticker->read()); +} + +/** + * Updates the (potentially stale) priority value cached in this object. + * This should be called when this object is outside the queue. + * + * @return true if the level changed. + */ +//bool PrioritizedSplitRunner::update_level_priority() { +// Priority new_priority = _task_handle->priority(); +// Priority old_priority = _priority.exchange(new_priority); +// return new_priority.level() != old_priority.level(); +//} + +bool PrioritizedSplitRunner::update_level_priority() { + std::lock_guard<std::mutex> lock(_priority_mutex); + Priority new_priority = _task_handle->priority(); + Priority old_priority = _priority; + _priority = new_priority; + return new_priority.level() != old_priority.level(); +} + +// +//void PrioritizedSplitRunner::reset_level_priority() { +// _priority.store(_task_handle->reset_level_priority()); +//} + +void PrioritizedSplitRunner::reset_level_priority() { + std::lock_guard<std::mutex> lock(_priority_mutex); + _priority = _task_handle->reset_level_priority(); +} + +int64_t PrioritizedSplitRunner::worker_id() const { + return _worker_id; +} + +int PrioritizedSplitRunner::split_id() const { + return _split_id; +} + +//Priority PrioritizedSplitRunner::priority() const { +// return _priority.load(); +//} + +Priority PrioritizedSplitRunner::priority() const { + std::lock_guard<std::mutex> lock(_priority_mutex); + return _priority; +} + +std::string PrioritizedSplitRunner::get_info() const { + // const auto now = std::chrono::steady_clock::now(); + // const int64_t current_time = now.time_since_epoch().count(); + // const int64_t wall_time = (current_time - _start.load()) / 1'000'000; // 转换为毫秒 + + // return fmt::format( + // "Split {:<15}-{} {} (start = {:.1f}, wall = {} ms, cpu = {} ms, wait = {} ms, calls = {})", + // _task_handle->get_task_id(), // 假设返回字符串类型 + // _split_id, // 整数类型 + // _split_runner->get_info(), // 需要确保SplitRunner有get_info() + // _start.load() / 1'000'000.0, // 转换为毫秒并保留1位小数 + // wall_time, // 已计算的耗时 + // _global_cpu_time_micros.get() / 1000, // 微秒转毫秒 + // _wait_nanos.load() / 1'000'000, // 纳秒转毫秒 + // _process_calls.load() // 调用次数 + // ); + return ""; +} + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.h b/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.h new file mode 100644 index 0000000..b7e396b --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.h
@@ -0,0 +1,93 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <atomic> +#include <memory> + +#include "common/factory_creator.h" +#include "vec/exec/executor/listenable_future.h" +#include "vec/exec/executor/split_runner.h" +#include "vec/exec/executor/stats.h" +#include "vec/exec/executor/ticker.h" +#include "vec/exec/executor/time_sharing/priority.h" + +namespace doris { +namespace vectorized { + +class TimeSharingTaskHandle; + +class PrioritizedSplitRunner : public std::enable_shared_from_this<PrioritizedSplitRunner> { + ENABLE_FACTORY_CREATOR(PrioritizedSplitRunner); + +public: + static constexpr auto SPLIT_RUN_QUANTA = std::chrono::seconds(1); + + PrioritizedSplitRunner(std::shared_ptr<TimeSharingTaskHandle> task_handle, int split_id, + std::shared_ptr<SplitRunner> split_runner, + std::shared_ptr<Ticker> ticker); + + // PrioritizedSplitRunner(CounterStats& globalCpuTimeMicros, + // CounterStats& globalScheduledTimeMicros, + // TimeStats& blockedQuantaWallTime, TimeStats& unblockedQuantaWallTime); + + Status init(); + + virtual ~PrioritizedSplitRunner() = default; + + std::shared_ptr<TimeSharingTaskHandle> task_handle() const; + SharedListenableFuture<Void> finished_future(); + bool is_closed() const; + void close(const Status& status); + int64_t created_nanos() const; + bool is_finished(); + Status finished_status(); + int64_t scheduled_nanos() const; + Result<SharedListenableFuture<Void>> process(); + void set_ready(); + bool update_level_priority(); + void reset_level_priority(); + int64_t worker_id() const; + int split_id() const; + virtual Priority priority() const; + + std::string get_info() const; + +private: + static std::atomic<int64_t> _next_worker_id; + + const int64_t _created_nanos {std::chrono::steady_clock::now().time_since_epoch().count()}; + std::shared_ptr<TimeSharingTaskHandle> _task_handle; + const int _split_id; + const int64_t _worker_id; + std::shared_ptr<SplitRunner> _split_runner; + std::shared_ptr<Ticker> _ticker; + SharedListenableFuture<Void> _finished_future {}; + + std::atomic<bool> _closed {false}; + // std::atomic<Priority> _priority {Priority {0, 0}}; + Priority _priority {0, 0}; + mutable std::mutex _priority_mutex; + std::atomic<int64_t> _last_ready {0}; + std::atomic<int64_t> _start {0}; + std::atomic<int64_t> _scheduled_nanos {0}; + std::atomic<int64_t> _wait_nanos {0}; + std::atomic<int> _process_calls {0}; +}; + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/priority.h b/be/src/vec/exec/executor/time_sharing/priority.h new file mode 100644 index 0000000..f26e879 --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/priority.h
@@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include <atomic> +#include <sstream> +#include <string> + +namespace doris { +namespace vectorized { + +/** + * Task (and split) priority is composed of a level and a within-level + * priority. Level decides which queue the split is placed in, while + * within-level priority decides which split is executed next in that level. + * <p> + * Tasks move from a lower to higher level as they exceed level thresholds + * of total scheduled time accrued to a task. + * <p> + * The priority within a level increases with the scheduled time accumulated + * in that level. This is necessary to achieve fairness when tasks acquire + * scheduled time at varying rates. + * <p> + * However, this priority is <b>not</b> equal to the task total accrued + * scheduled time. When a task graduates to a higher level, the level + * priority is set to the minimum current priority in the new level. This + * allows us to maintain instantaneous fairness in terms of scheduled time. + */ +class Priority { +public: + Priority(int level, int64_t level_priority) : _level(level), _level_priority(level_priority) {} + + int level() const { return _level; } + int64_t level_priority() const { return _level_priority; } + + bool operator<(const Priority& other) const { + if (_level != other._level) return _level < other._level; + return _level_priority < other._level_priority; + } + + std::string to_string() const { + std::ostringstream os; + os << "Priority(level=" << _level << ", priority=" << _level_priority << ")"; + return os.str(); + } + +private: + int _level; + int64_t _level_priority; +}; + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/split_concurrency_controller.h b/be/src/vec/exec/executor/time_sharing/split_concurrency_controller.h new file mode 100644 index 0000000..3c803cc --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/split_concurrency_controller.h
@@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#pragma once + +#include <chrono> +#include <cmath> +#include <cstdint> +#include <stdexcept> + +#include "common/factory_creator.h" +#include "common/status.h" + +namespace doris::vectorized { + +class SplitConcurrencyController { + ENABLE_FACTORY_CREATOR(SplitConcurrencyController); + +public: + SplitConcurrencyController(int initial_concurrency, + std::chrono::nanoseconds adjustment_interval) + : _adjustment_interval_nanos(adjustment_interval), + _target_concurrency(initial_concurrency) {} + + void update(uint64_t nanos, double utilization, int current_concurrency) { + _validate_args(nanos, utilization, current_concurrency); + _thread_nanos_since_adjust += nanos; + if (_should_increase_concurrency(current_concurrency, utilization)) { + _reset_adjust_counter(); + ++_target_concurrency; + } + } + + int target_concurrency() const { return _target_concurrency; } + + void split_finished(uint64_t split_nanos, double utilization, int current_concurrency) { + _validate_args(split_nanos, utilization, current_concurrency); + if (_should_adjust(split_nanos)) { + if (utilization > TARGET_UTIL && _target_concurrency > 1) { + _reset_adjust_counter(); + --_target_concurrency; + } else if (utilization < TARGET_UTIL && current_concurrency >= _target_concurrency) { + _reset_adjust_counter(); + ++_target_concurrency; + } + } + } + +private: + static constexpr double TARGET_UTIL = 0.5; + const std::chrono::nanoseconds _adjustment_interval_nanos; + int _target_concurrency; + uint64_t _thread_nanos_since_adjust = 0; + + void _validate_args(uint64_t nanos, double util, int concurrency) const { + CHECK(nanos >= 0) << "Negative nanos"; + CHECK(std::isfinite(util)) << "Invalid utilization"; + CHECK(util >= 0) << "Negative utilization"; + CHECK(concurrency >= 0) << "Negative concurrency"; + } + + bool _should_increase_concurrency(int curr_concurrency, double util) const { + return _thread_nanos_since_adjust >= _adjustment_interval_nanos.count() && + util < TARGET_UTIL && curr_concurrency >= _target_concurrency; + } + + bool _should_adjust(uint64_t split_nanos) const { + return _thread_nanos_since_adjust >= _adjustment_interval_nanos.count() || + _thread_nanos_since_adjust >= split_nanos; + } + + void _reset_adjust_counter() { _thread_nanos_since_adjust = 0; } +}; + +} // namespace doris::vectorized
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp new file mode 100644 index 0000000..82907f1 --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
@@ -0,0 +1,408 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/executor/time_sharing/time_sharing_task_executor.h" + +#include <functional> +#include <iostream> + +#include "util/defer_op.h" +#include "util/thread.h" +#include "util/threadpool.h" +#include "vec/exec/executor/time_sharing/time_sharing_task_handle.h" + +namespace doris { +namespace vectorized { + +TimeSharingTaskExecutor::TimeSharingTaskExecutor( + int runner_threads, int min_concurrency, int guaranteed_concurrency_per_task, + int max_concurrency_per_task, std::shared_ptr<Ticker> ticker, + std::chrono::milliseconds stuck_split_warning_threshold, + std::shared_ptr<MultilevelSplitQueue> split_queue) + : _runner_threads(runner_threads), + _min_concurrency(min_concurrency), + _guaranteed_concurrency_per_task(guaranteed_concurrency_per_task), + _max_concurrency_per_task(max_concurrency_per_task), + _ticker(ticker != nullptr ? ticker : std::make_shared<SystemTicker>()), + _stuck_split_warning_threshold(stuck_split_warning_threshold), + _waiting_splits(split_queue != nullptr ? std::move(split_queue) + : std::make_shared<MultilevelSplitQueue>(2)) {} + +Status TimeSharingTaskExecutor::init() { + ThreadPoolBuilder builder("SplitRunner"); + builder.set_min_threads(_runner_threads).set_max_threads(_runner_threads * 2); + RETURN_IF_ERROR(builder.build(&_thread_pool)); + return Status::OK(); +} + +TimeSharingTaskExecutor::~TimeSharingTaskExecutor() { + if (!_stopped.exchange(true)) { + stop(); + } + + std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy; + { + std::lock_guard<std::mutex> guard(_mutex); + for (auto& [task_id, handle] : _tasks) { + if (handle->is_closed()) { + LOG(WARNING) << "task is already destroyed, task_id: " << task_id.to_string(); + continue; + } + auto task_splits = handle->close(); + splits_to_destroy.insert(splits_to_destroy.end(), + std::make_move_iterator(task_splits.begin()), + std::make_move_iterator(task_splits.end())); + _record_leaf_splits_size(guard); + } + //_tasks.clear(); + //_all_splits.clear(); + //_intermediate_splits.clear(); + //_blocked_splits.clear(); + _waiting_splits->remove_all(splits_to_destroy); + } + + if (splits_to_destroy.empty()) { + return; + } + + for (auto& split : splits_to_destroy) { + split->close(Status::OK()); + } + + if (_thread_pool) { + _thread_pool->shutdown(); + } +} + +Status TimeSharingTaskExecutor::start() { + std::lock_guard<std::mutex> guard(_mutex); + for (int i = 0; i < _runner_threads; ++i) { + RETURN_IF_ERROR(_add_runner_thread()); + } + return Status::OK(); +} + +void TimeSharingTaskExecutor::stop() { + _waiting_splits->interrupt(); + { + std::lock_guard<std::mutex> guard(_mutex); + _stopped = true; + } + if (_thread_pool) { + _thread_pool->shutdown(); + } +} + +Status TimeSharingTaskExecutor::_add_runner_thread() { + return _thread_pool->submit_func([this]() { + Thread::set_self_name("SplitRunner"); + + while (!_stopped) { + std::shared_ptr<PrioritizedSplitRunner> split; + split = _waiting_splits->take(); + if (!split) { + return; + } + + // ScopedRunnerTracker tracker(_mutex, _running_splits, split); + { + std::lock_guard<std::mutex> guard(_mutex); + _running_splits.insert(split); + } + Defer defer {[&]() { + std::lock_guard<std::mutex> guard(_mutex); + _running_splits.erase(split); + }}; + Result<SharedListenableFuture<Void>> blocked_future_result = split->process(); + if (!blocked_future_result.has_value()) { + return; + } + auto blocked_future = blocked_future_result.value(); + + if (split->is_finished()) { + _split_finished(split, split->finished_status()); + } else { + std::lock_guard<std::mutex> guard(_mutex); + if (blocked_future.is_done()) { + _waiting_splits->offer(split); + } else { + _blocked_splits[split] = blocked_future; + + _blocked_splits[split].add_callback( + [this, split](const Void& value, const Status& status) { + if (status.ok()) { + std::lock_guard<std::mutex> guard(_mutex); + _blocked_splits.erase(split); + split->reset_level_priority(); + _waiting_splits->offer(split); + } else { + LOG(WARNING) << "blocked split is failed, split_id: " + << split->split_id() << ", status: " << status; + _split_finished(split, status); + } + }); + } + } + } + }); +} + +Result<std::shared_ptr<TaskHandle>> TimeSharingTaskExecutor::create_task( + const TaskId& task_id, std::function<double()> utilization_supplier, + int initial_split_concurrency, std::chrono::nanoseconds split_concurrency_adjust_frequency, + std::optional<int> max_concurrency_per_task) { + auto task_handle = std::make_shared<TimeSharingTaskHandle>( + task_id, _waiting_splits, utilization_supplier, initial_split_concurrency, + split_concurrency_adjust_frequency, max_concurrency_per_task); + RETURN_IF_ERROR_RESULT(task_handle->init()); + + std::lock_guard<std::mutex> lock(_mutex); + + _tasks[task_id] = task_handle; + + return task_handle; +} + +Status TimeSharingTaskExecutor::add_task(const TaskId& task_id, + std::shared_ptr<TaskHandle> task_handle) { + std::lock_guard<std::mutex> lock(_mutex); + _tasks[task_id] = + std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskHandle>(task_handle); + return Status::OK(); +} + +Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_handle) { + auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle); + std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy; + + { + std::lock_guard<std::mutex> guard(_mutex); + auto it = _tasks.find(handle->task_id()); + if (it == _tasks.end() || handle->is_closed()) { + return Status::OK(); + } + _tasks.erase(it); + + // Task is already closed + if (task_handle->is_closed()) { + return Status::OK(); + } + + splits_to_destroy = handle->close(); + + for (const auto& split : splits_to_destroy) { + _all_splits.erase(split); + _intermediate_splits.erase(split); + _blocked_splits.erase(split); + } + _waiting_splits->remove_all(splits_to_destroy); + _record_leaf_splits_size(guard); + } + + // call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor + for (auto& split : splits_to_destroy) { + split->close(Status::OK()); + } + + // record completed stats + int64_t thread_usage_nanos = handle->scheduled_nanos(); + int level = MultilevelSplitQueue::compute_level(thread_usage_nanos); + _completed_tasks_per_level[level]++; + + if (splits_to_destroy.empty()) { + return Status::OK(); + } + + // replace blocked splits that were terminated + { + std::lock_guard<std::mutex> guard(_mutex); + _add_new_entrants(guard); + _record_leaf_splits_size(guard); + } + return Status::OK(); +} + +Result<std::vector<SharedListenableFuture<Void>>> TimeSharingTaskExecutor::enqueue_splits( + std::shared_ptr<TaskHandle> task_handle, bool intermediate, + const std::vector<std::shared_ptr<SplitRunner>>& splits) { + std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy; + Defer defer {[&]() { + for (auto& split : splits_to_destroy) { + split->close(Status::OK()); + } + }}; + std::vector<SharedListenableFuture<Void>> finished_futures; + auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle); + { + std::lock_guard<std::mutex> guard(_mutex); + for (const auto& task_split : splits) { + TaskId task_id = handle->task_id(); + int split_id = handle->next_split_id(); + + auto prioritized_split = + PrioritizedSplitRunner::create_shared(handle, split_id, task_split, _ticker); + RETURN_IF_ERROR_RESULT(prioritized_split->init()); + if (intermediate) { + if (handle->record_intermediate_split(prioritized_split)) { + _start_intermediate_split(prioritized_split, guard); + } else { + splits_to_destroy.push_back(prioritized_split); + } + } else { + if (handle->enqueue_split(prioritized_split)) { + _schedule_task_if_necessary(handle, guard); + _add_new_entrants(guard); + } else { + splits_to_destroy.push_back(prioritized_split); + } + } + finished_futures.push_back(prioritized_split->finished_future()); + } + _record_leaf_splits_size(guard); + } + return finished_futures; +} + +void TimeSharingTaskExecutor::_split_finished(std::shared_ptr<PrioritizedSplitRunner> split, + const Status& status) { + _completed_splits_per_level[split->priority().level()]++; + { + std::lock_guard<std::mutex> guard(_mutex); + _all_splits.erase(split); + + auto task_handle = split->task_handle(); + task_handle->split_finished(split); + + _schedule_task_if_necessary(task_handle, guard); + + _add_new_entrants(guard); + _record_leaf_splits_size(guard); + } + // call close outside of synchronized block as it is expensive and doesn't need a lock on the task executor + split->close(status); +} + +void TimeSharingTaskExecutor::_schedule_task_if_necessary( + std::shared_ptr<TimeSharingTaskHandle> task_handle, std::lock_guard<std::mutex>& guard) { + int guaranteed_concurrency = std::min( + _guaranteed_concurrency_per_task, + task_handle->max_concurrency_per_task().value_or(std::numeric_limits<int>::max())); + int splits_to_schedule = guaranteed_concurrency - task_handle->running_leaf_splits(); + + for (int i = 0; i < splits_to_schedule; ++i) { + auto split = task_handle->poll_next_split(); + if (!split) return; + + _start_split(split, guard); + auto elapsed_nanos = std::chrono::nanoseconds( + std::chrono::steady_clock::now().time_since_epoch().count() - + split->created_nanos()); + _split_queued_time + << std::chrono::duration_cast<std::chrono::microseconds>(elapsed_nanos).count(); + } + _record_leaf_splits_size(guard); +} + +void TimeSharingTaskExecutor::_add_new_entrants(std::lock_guard<std::mutex>& guard) { + int running = _all_splits.size() - _intermediate_splits.size(); + for (int i = 0; i < _min_concurrency - running; i++) { + auto split = _poll_next_split_worker(guard); + if (!split) { + break; + } + + auto elapsed_nanos = std::chrono::nanoseconds( + std::chrono::steady_clock::now().time_since_epoch().count() - + split->created_nanos()); + _split_queued_time + << std::chrono::duration_cast<std::chrono::microseconds>(elapsed_nanos).count(); + _start_split(split, guard); + } +} + +void TimeSharingTaskExecutor::_start_intermediate_split( + std::shared_ptr<PrioritizedSplitRunner> split, std::lock_guard<std::mutex>& guard) { + _start_split(split, guard); + _intermediate_splits.insert(split); +} + +void TimeSharingTaskExecutor::_start_split(std::shared_ptr<PrioritizedSplitRunner> split, + std::lock_guard<std::mutex>& guard) { + _all_splits.insert(split); + _waiting_splits->offer(split); +} + +std::shared_ptr<PrioritizedSplitRunner> TimeSharingTaskExecutor::_poll_next_split_worker( + std::lock_guard<std::mutex>& guard) { + // 遍历任务列表,找到第一个能产生分片的任务 + // 然后将该任务移到列表末尾,实现轮询调度 + for (auto it = _tasks.begin(); it != _tasks.end();) { + auto task = it->second; + // 跳过已经运行配置的最大驱动数的任务 + if (task->running_leaf_splits() >= + task->max_concurrency_per_task().value_or(_max_concurrency_per_task)) { + ++it; + continue; + } + + auto split = task->poll_next_split(); + if (split) { + // 将任务移到列表末尾 + auto task_copy = task; + auto task_id = it->first; + it = _tasks.erase(it); + _tasks[task_id] = task_copy; + return split; + } + ++it; + } + return nullptr; +} + +void TimeSharingTaskExecutor::_record_leaf_splits_size(std::lock_guard<std::mutex>& guard) { + // auto now = _ticker->read(); + // auto time_difference = now - _last_leaf_splits_size_record_time; + // + // if (time_difference > 0) { + // // 记录当前叶子分片的数量和持续时间 + // // _leaf_splits_size.add(_last_leaf_splits_size, std::chrono::nanoseconds(time_difference)); + // _last_leaf_splits_size_record_time = now; + // } + // // always record new lastLeafSplitsSize as it might have changed + // // even if timeDifference is 0 + // _last_leaf_splits_size = _all_splits.size() - _intermediate_splits.size(); +} + +void TimeSharingTaskExecutor::_interrupt() { + std::lock_guard<std::mutex> guard(_mutex); + _condition.notify_all(); + _waiting_splits->interrupt(); +} + +int64_t TimeSharingTaskExecutor::_get_running_tasks_for_level(int level) const { + std::lock_guard<std::mutex> guard(_mutex); + int64_t count = 0; + for (const auto& [task_id, task] : _tasks) { + if (task->priority().level() == level) { + count++; + } + } + return count; +} + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h new file mode 100644 index 0000000..c3fe520 --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
@@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <memory> +#include <mutex> +#include <optional> +#include <queue> +#include <thread> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "bvar/latency_recorder.h" +#include "util/threadpool.h" +#include "vec/exec/executor/listenable_future.h" +#include "vec/exec/executor/stats.h" +#include "vec/exec/executor/task_executor.h" +#include "vec/exec/executor/ticker.h" +#include "vec/exec/executor/time_sharing/multilevel_split_queue.h" +#include "vec/exec/executor/time_sharing/prioritized_split_runner.h" + +namespace doris { +namespace vectorized { + +/** + * ThreadSafe + */ +class TimeSharingTaskExecutor : public TaskExecutor { + ENABLE_FACTORY_CREATOR(TimeSharingTaskExecutor); + +public: + TimeSharingTaskExecutor(int threads, int min_concurrency, int guaranteed_concurrency_per_task, + int max_concurrency_per_task, std::shared_ptr<Ticker> ticker, + std::chrono::milliseconds stuck_split_warning_threshold = + std::chrono::milliseconds(60000), + std::shared_ptr<MultilevelSplitQueue> split_queue = nullptr); + + ~TimeSharingTaskExecutor() override; + + Status init() override; + + Status start() override; + void stop() override; + + Result<std::shared_ptr<TaskHandle>> create_task( + const TaskId& task_id, std::function<double()> utilization_supplier, + int initial_split_concurrency, + std::chrono::nanoseconds split_concurrency_adjust_frequency, + std::optional<int> max_concurrency_per_task) override; + + Status add_task(const TaskId& task_id, std::shared_ptr<TaskHandle> task_handle) override; + + Status remove_task(std::shared_ptr<TaskHandle> task_handle) override; + + Result<std::vector<SharedListenableFuture<Void>>> enqueue_splits( + std::shared_ptr<TaskHandle> task_handle, bool intermediate, + const std::vector<std::shared_ptr<SplitRunner>>& splits) override; + + size_t waiting_splits_size() const { return _waiting_splits->size(); } + + size_t intermediate_splits_size() const { + std::lock_guard<std::mutex> guard(_mutex); + return _intermediate_splits.size(); + } + + size_t running_splits_size() const { return _running_splits.size(); } + + size_t blocked_splits_size() const { return _blocked_splits.size(); } + + size_t total_splits_size() const { + std::lock_guard<std::mutex> guard(_mutex); + return _all_splits.size(); + } + + size_t tasks_size() const { + std::lock_guard<std::mutex> guard(_mutex); + return _tasks.size(); + } + + int64_t completed_tasks_level0() const { return _completed_tasks_per_level[0]; } + + int64_t completed_tasks_level1() const { return _completed_tasks_per_level[1]; } + + int64_t completed_tasks_level2() const { return _completed_tasks_per_level[2]; } + + int64_t completed_tasks_level3() const { return _completed_tasks_per_level[3]; } + + int64_t completed_tasks_level4() const { return _completed_tasks_per_level[4]; } + + int64_t completed_splits_level0() const { return _completed_splits_per_level[0]; } + + int64_t completed_splits_level1() const { return _completed_splits_per_level[1]; } + + int64_t completed_splits_level2() const { return _completed_splits_per_level[2]; } + + int64_t completed_splits_level3() const { return _completed_splits_per_level[3]; } + + int64_t completed_splits_level4() const { return _completed_splits_per_level[4]; } + + int64_t running_tasks_level0() const { return _get_running_tasks_for_level(0); } + + int64_t running_tasks_level1() const { return _get_running_tasks_for_level(1); } + + int64_t running_tasks_level2() const { return _get_running_tasks_for_level(2); } + + int64_t running_tasks_level3() const { return _get_running_tasks_for_level(3); } + + int64_t running_tasks_level4() const { return _get_running_tasks_for_level(4); } + +private: + class ScopedRunnerTracker { + public: + ScopedRunnerTracker( + std::mutex& mutex, + std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>>& running_splits, + std::shared_ptr<PrioritizedSplitRunner> split) + : _mutex(mutex), _running_splits(running_splits), _split(std::move(split)) { + std::lock_guard<std::mutex> guard(_mutex); + _running_splits.insert(_split); + } + + ~ScopedRunnerTracker() { + std::lock_guard<std::mutex> guard(_mutex); + _running_splits.erase(_split); + } + + private: + std::mutex& _mutex; + std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>>& _running_splits; + std::shared_ptr<PrioritizedSplitRunner> _split; + }; + + class TaskRunner { + public: + TaskRunner(TimeSharingTaskExecutor& executor); + void run(); + void stop(); + + private: + TimeSharingTaskExecutor& _executor; + std::atomic<bool> _running {true}; + }; + + Status _add_runner_thread(); + void _schedule_task_if_necessary(std::shared_ptr<TimeSharingTaskHandle> task_handle, + std::lock_guard<std::mutex>& guard); + void _add_new_entrants(std::lock_guard<std::mutex>& guard); + void _start_intermediate_split(std::shared_ptr<PrioritizedSplitRunner> split, + std::lock_guard<std::mutex>& guard); + void _start_split(std::shared_ptr<PrioritizedSplitRunner> split, + std::lock_guard<std::mutex>& guard); + std::shared_ptr<PrioritizedSplitRunner> _poll_next_split_worker( + std::lock_guard<std::mutex>& guard); + void _record_leaf_splits_size(std::lock_guard<std::mutex>& guard); + void _split_finished(std::shared_ptr<PrioritizedSplitRunner> split, const Status& status); + void _interrupt(); + + int64_t _get_running_tasks_for_level(int level) const; + + std::unique_ptr<ThreadPool> _thread_pool; + const int _runner_threads; + const int _min_concurrency; + const int _guaranteed_concurrency_per_task; + const int _max_concurrency_per_task; + std::shared_ptr<Ticker> _ticker; + const std::chrono::milliseconds _stuck_split_warning_threshold; + std::shared_ptr<MultilevelSplitQueue> _waiting_splits; + + mutable std::mutex _mutex; + std::condition_variable _condition; + std::atomic<bool> _stopped {false}; + + std::vector<std::unique_ptr<TaskRunner>> _task_runners; + + std::unordered_map<TaskId, std::shared_ptr<TimeSharingTaskHandle>> _tasks; + + std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>> _all_splits; + std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>> _intermediate_splits; + std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>> _running_splits; + std::unordered_map<std::shared_ptr<PrioritizedSplitRunner>, SharedListenableFuture<Void>> + _blocked_splits; + std::array<std::atomic<int64_t>, 5> _completed_tasks_per_level = {0, 0, 0, 0, 0}; + std::array<std::atomic<int64_t>, 5> _completed_splits_per_level = {0, 0, 0, 0, 0}; + + bvar::LatencyRecorder _split_queued_time; +}; + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.cpp b/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.cpp new file mode 100644 index 0000000..0979b72 --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.cpp
@@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/executor/time_sharing/time_sharing_task_handle.h" + +#include "vec/exec/executor/time_sharing/multilevel_split_queue.h" + +namespace doris { +namespace vectorized { + +TimeSharingTaskHandle::TimeSharingTaskHandle( + const TaskId& task_id, std::shared_ptr<MultilevelSplitQueue> split_queue, + std::function<double()> utilization_supplier, int initial_split_concurrency, + std::chrono::nanoseconds split_concurrency_adjust_frequency, + std::optional<int> max_concurrency_per_task) + : _task_id(task_id), + _split_queue(std::move(split_queue)), + _utilization_supplier(std::move(utilization_supplier)), + _max_concurrency_per_task(max_concurrency_per_task), + _concurrency_controller(initial_split_concurrency, split_concurrency_adjust_frequency) {} + +Status TimeSharingTaskHandle::init() { + return Status::OK(); +} + +//Priority TimeSharingTaskHandle::add_scheduled_nanos(int64_t duration_nanos) { +// std::lock_guard<std::mutex> lock(_mutex); +// _concurrency_controller.update(duration_nanos, _utilization_supplier(), +// _running_leaf_splits.size()); +// _scheduled_nanos += duration_nanos; +// +// Priority new_priority = +// _split_queue->update_priority(_priority.load(), duration_nanos, _scheduled_nanos); +// +// _priority.store(new_priority); +// return new_priority; +//} + +Priority TimeSharingTaskHandle::add_scheduled_nanos(int64_t duration_nanos) { + std::lock_guard<std::mutex> lock(_mutex); + _concurrency_controller.update(duration_nanos, _utilization_supplier(), + _running_leaf_splits.size()); + _scheduled_nanos += duration_nanos; + + Priority new_priority = + _split_queue->update_priority(_priority, duration_nanos, _scheduled_nanos); + + _priority = new_priority; + return new_priority; +} + +//Priority TimeSharingTaskHandle::reset_level_priority() { +// std::lock_guard<std::mutex> lock(_mutex); +// +// Priority current_priority = _priority.load(); +// int64_t level_min_priority = +// _split_queue->get_level_min_priority(current_priority.level(), _scheduled_nanos); +// +// if (current_priority.level_priority() < level_min_priority) { +// Priority new_priority(current_priority.level(), level_min_priority); +// _priority.store(new_priority); +// return new_priority; +// } +// +// return current_priority; +//} + +Priority TimeSharingTaskHandle::reset_level_priority() { + std::lock_guard<std::mutex> lock(_mutex); + + Priority current_priority = _priority; + int64_t level_min_priority = + _split_queue->get_level_min_priority(current_priority.level(), _scheduled_nanos); + + if (current_priority.level_priority() < level_min_priority) { + Priority new_priority(current_priority.level(), level_min_priority); + _priority = new_priority; + return new_priority; + } + + return current_priority; +} + +bool TimeSharingTaskHandle::is_closed() const { + return _closed.load(); +} + +//Priority TimeSharingTaskHandle::priority() const { +// return _priority.load(); +//} + +Priority TimeSharingTaskHandle::priority() const { + std::lock_guard<std::mutex> lock(_mutex); + return _priority; +} + +TaskId TimeSharingTaskHandle::task_id() const { + return _task_id; +} + +std::optional<int> TimeSharingTaskHandle::max_concurrency_per_task() const { + return _max_concurrency_per_task; +} + +std::vector<std::shared_ptr<PrioritizedSplitRunner>> TimeSharingTaskHandle::close() { + std::lock_guard<std::mutex> lock(_mutex); + + if (_closed.exchange(true)) { + return {}; + } + + std::vector<std::shared_ptr<PrioritizedSplitRunner>> result; + result.reserve(_running_intermediate_splits.size() + _running_leaf_splits.size() + + _queued_leaf_splits.size()); + + result.insert(result.end(), _running_intermediate_splits.begin(), + _running_intermediate_splits.end()); + result.insert(result.end(), _running_leaf_splits.begin(), _running_leaf_splits.end()); + + while (!_queued_leaf_splits.empty()) { + result.push_back(_queued_leaf_splits.front()); + _queued_leaf_splits.pop(); + } + + _running_intermediate_splits.clear(); + _running_leaf_splits.clear(); + + return result; +} + +bool TimeSharingTaskHandle::enqueue_split(std::shared_ptr<PrioritizedSplitRunner> split) { + std::lock_guard<std::mutex> lock(_mutex); + if (_closed) { + return false; + } + _queued_leaf_splits.emplace(std::move(split)); + return true; +} + +bool TimeSharingTaskHandle::record_intermediate_split( + std::shared_ptr<PrioritizedSplitRunner> split) { + std::lock_guard<std::mutex> lock(_mutex); + if (_closed) { + return false; + } + _running_intermediate_splits.emplace_back(std::move(split)); + return true; +} + +int TimeSharingTaskHandle::running_leaf_splits() const { + std::lock_guard<std::mutex> lock(_mutex); + return _running_leaf_splits.size(); +} + +int64_t TimeSharingTaskHandle::scheduled_nanos() const { + std::lock_guard<std::mutex> lock(_mutex); + return _scheduled_nanos; +} + +std::shared_ptr<PrioritizedSplitRunner> TimeSharingTaskHandle::poll_next_split() { + std::lock_guard<std::mutex> lock(_mutex); + + if (_closed) { + return nullptr; + } + + if (_running_leaf_splits.size() >= _concurrency_controller.target_concurrency()) { + return nullptr; + } + + if (_queued_leaf_splits.empty()) { + return nullptr; + } + + auto split = _queued_leaf_splits.front(); + _queued_leaf_splits.pop(); + _running_leaf_splits.push_back(split); + return split; +} + +void TimeSharingTaskHandle::split_finished(std::shared_ptr<PrioritizedSplitRunner> split) { + std::lock_guard<std::mutex> lock(_mutex); + _concurrency_controller.split_finished(split->scheduled_nanos(), _utilization_supplier(), + _running_leaf_splits.size()); + + auto it = std::find(_running_intermediate_splits.begin(), _running_intermediate_splits.end(), + split); + if (it != _running_intermediate_splits.end()) { + _running_intermediate_splits.erase(it); + } + + it = std::find(_running_leaf_splits.begin(), _running_leaf_splits.end(), split); + if (it != _running_leaf_splits.end()) { + _running_leaf_splits.erase(it); + } +} + +int TimeSharingTaskHandle::next_split_id() { + return _next_split_id.fetch_add(1); +} + +} // namespace vectorized +} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.h b/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.h new file mode 100644 index 0000000..3373a68 --- /dev/null +++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.h
@@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <atomic> +#include <memory> +#include <mutex> +#include <optional> +#include <queue> +#include <vector> + +#include "vec/exec/executor/task_handle.h" +#include "vec/exec/executor/task_id.h" +#include "vec/exec/executor/time_sharing/prioritized_split_runner.h" +#include "vec/exec/executor/time_sharing/priority.h" +#include "vec/exec/executor/time_sharing/split_concurrency_controller.h" + +namespace doris { +namespace vectorized { + +class MultilevelSplitQueue; + +class TimeSharingTaskHandle : public TaskHandle { + ENABLE_FACTORY_CREATOR(TimeSharingTaskHandle); + +public: + TimeSharingTaskHandle(const TaskId& task_id, std::shared_ptr<MultilevelSplitQueue> split_queue, + std::function<double()> utilization_supplier, + int initial_split_concurrency, + std::chrono::nanoseconds split_concurrency_adjust_frequency, + std::optional<int> max_concurrency_per_task); + + Status init() override; + + Priority add_scheduled_nanos(int64_t duration_nanos); + Priority reset_level_priority(); + bool is_closed() const override; + Priority priority() const; + TaskId task_id() const override; + std::optional<int> max_concurrency_per_task() const; + std::vector<std::shared_ptr<PrioritizedSplitRunner>> close(); + bool enqueue_split(std::shared_ptr<PrioritizedSplitRunner> split); + bool record_intermediate_split(std::shared_ptr<PrioritizedSplitRunner> split); + int running_leaf_splits() const; + int64_t scheduled_nanos() const; + std::shared_ptr<PrioritizedSplitRunner> poll_next_split(); + void split_finished(std::shared_ptr<PrioritizedSplitRunner> split); + int next_split_id(); + +private: + mutable std::mutex _mutex; + std::atomic<bool> _closed {false}; + const TaskId _task_id; + std::shared_ptr<MultilevelSplitQueue> _split_queue; + std::function<double()> _utilization_supplier; + std::optional<int> _max_concurrency_per_task; + SplitConcurrencyController _concurrency_controller; + + std::queue<std::shared_ptr<PrioritizedSplitRunner>> _queued_leaf_splits; + std::vector<std::shared_ptr<PrioritizedSplitRunner>> _running_leaf_splits; + std::vector<std::shared_ptr<PrioritizedSplitRunner>> _running_intermediate_splits; + int64_t _scheduled_nanos {0}; + std::atomic<int> _next_split_id {0}; + // std::atomic<Priority> _priority {Priority(0, 0)}; + Priority _priority {0, 0}; +}; + +} // namespace vectorized +} // namespace doris
diff --git a/be/test/vec/exec/executor/time_sharing/multilevel_split_queue_test.cpp b/be/test/vec/exec/executor/time_sharing/multilevel_split_queue_test.cpp new file mode 100644 index 0000000..cef23ff --- /dev/null +++ b/be/test/vec/exec/executor/time_sharing/multilevel_split_queue_test.cpp
@@ -0,0 +1,260 @@ +//// Licensed to the Apache Software Foundation (ASF) under one +//// or more contributor license agreements. See the NOTICE file +//// distributed with this work for additional information +//// regarding copyright ownership. The ASF licenses this file +//// to you under the Apache License, Version 2.0 (the +//// "License"); you may not use this file except in compliance +//// with the License. You may obtain a copy of the License at +//// +//// http://www.apache.org/licenses/LICENSE-2.0 +//// +//// Unless required by applicable law or agreed to in writing, +//// software distributed under the License is distributed on an +//// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +//// KIND, either express or implied. See the License for the +//// specific language governing permissions and limitations +//// under the License. +// +//#include <gtest/gtest.h> +//#include <atomic> +//#include <thread> +//#include "vec/exec/scan/MultilevelSplitQueue.h" +// +//namespace doris::vectorized { +// +//// 测试用SplitRunner实现 +//class TestSplitRunner : public PrioritizedSplitRunner { +//public: +// TestSplitRunner(Priority priority) +// : PrioritizedSplitRunner(_global_cpu_time_micros, _global_scheduled_time_micros, _blocked_quanta_wall_time, _unblocked_quanta_wall_time), _priority(priority) {} +// +// Priority getPriority() const override { return _priority; } +// void updatePriority(Priority new_pri) { _priority = new_pri; } +// +//private: +// Priority _priority; +// CounterStats _global_cpu_time_micros; +// CounterStats _global_scheduled_time_micros; +// +// TimeStats _blocked_quanta_wall_time; +// TimeStats _unblocked_quanta_wall_time; +//}; +// +//class MultilevelSplitQueueTest : public testing::Test { +//protected: +// void SetUp() override { +// queue = std::make_unique<MultilevelSplitQueue>(2.0); +// } +// +// std::shared_ptr<TestSplitRunner> make_split(int level, int64_t pri) { +// return std::make_shared<TestSplitRunner>(Priority(level, pri)); +// } +// +// std::unique_ptr<MultilevelSplitQueue> queue; +//}; +// +//TEST_F(MultilevelSplitQueueTest, BasicFunction) { +// // 验证空队列 +// EXPECT_EQ(queue->size(), 0); +// +// // 添加任务 +// auto split = make_split(0, 100); +// queue->addSplit(split); +// EXPECT_EQ(queue->size(), 1); +// +// // 取出任务 +// auto task = queue->pollSplit(); +// ASSERT_NE(task, nullptr); +// EXPECT_EQ(task->getPriority().getLevel(), 0); +// EXPECT_EQ(queue->size(), 0); +//} +// +//TEST_F(MultilevelSplitQueueTest, LevelContributionCap) { +// // 创建两个测试任务 +// auto split0 = make_split(0, 0); +// auto split1 = make_split(0, 0); +// +// // 获取级别阈值数组(假设实现中有这个静态成员) +// const auto& levelThresholds = MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS; +// +// int64_t scheduledNanos = 0; +// for (size_t i = 0; i < levelThresholds.size() - 1; ++i) { +// // 计算需要增加的时间量(秒转纳秒) +// const int64_t levelAdvance = +// (levelThresholds[i + 1] - levelThresholds[i]) * 1'000'000'000LL; +// // 模拟任务累积时间 +// scheduledNanos += levelAdvance; +// split0->updatePriority(queue->updatePriority( +// split0->getPriority(), levelAdvance, scheduledNanos)); +// split1->updatePriority(queue->updatePriority( +// split1->getPriority(), levelAdvance, scheduledNanos)); +// +// // 验证级别提升 +// EXPECT_EQ(split0->getPriority().getLevel(), i + 1); +// EXPECT_EQ(split1->getPriority().getLevel(), i + 1); +// +// // 验证级别累计时间(应用贡献上限) +// const int64_t expected = +// 2 * std::min(levelAdvance, MultilevelSplitQueue::LEVEL_CONTRIBUTION_CAP); +// EXPECT_EQ(queue->get_level_scheduled_time(i), expected); +// EXPECT_EQ(queue->get_level_scheduled_time(i + 1), 0); +// } +//} +// +//TEST_F(MultilevelSplitQueueTest, LevelContributionWithCap) { +// // 创建队列(levelTimeMultiplier=2) +// MultilevelSplitQueue queue(2.0); +// +// // 创建测试任务并初始化优先级 +// auto split = make_split(0, 0); +// queue.addSplit(split); +// +// // 模拟运行10分钟(转换为纳秒) +// constexpr int64_t quantaNanos = 10LL * 60 * 1'000'000'000; // 10分钟 +// int64_t scheduledNanos = 0; +// +// // 更新优先级触发时间分配 +// auto task = std::static_pointer_cast<TestSplitRunner>(queue.pollSplit()); +// scheduledNanos += quantaNanos; +// Priority new_pri = queue.updatePriority( +// task->getPriority(), +// quantaNanos, +// scheduledNanos +// ); +// task->updatePriority(new_pri); +// +// // 计算受限制的时间贡献 +// int64_t cappedNanos = std::min(quantaNanos, MultilevelSplitQueue::LEVEL_CONTRIBUTION_CAP); +// const auto& levels = MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS; +// +// // 验证每个级别的时间分配 +// for (size_t i = 0; i < levels.size() - 1; ++i) { +// // 计算当前级别时间阈值(转换为纳秒) +// int64_t levelThreshold = static_cast<int64_t>((levels[i + 1] - levels[i])) * 1'000'000'000L; +// int64_t expected = std::min(levelThreshold, cappedNanos); +// +// // 验证当前级别累计时间 +// EXPECT_EQ(queue.get_level_scheduled_time(i), expected); +// cappedNanos -= expected; +// } +// +// // 验证剩余时间分配到最后一个级别 +//// EXPECT_EQ(queue.get_level_scheduled_time(levels.size() - 1), cappedNanos); +//} +// +//TEST_F(MultilevelSplitQueueTest, TimeCompensation) { +// // 初始状态验证 +// EXPECT_EQ(queue->get_level_scheduled_time(2), 0); +// +// int64_t scheduledNanos = 0L; +// +// // 1. 创建并添加任务 +// auto split = make_split(2, 200); +// queue->addSplit(split); +// +// // 2. 取出任务(触发时间补偿) +// auto task = std::static_pointer_cast<TestSplitRunner>(queue->pollSplit()); +// ASSERT_NE(task, nullptr); +// +// // 3. 模拟任务运行5秒并更新优先级 +// const int64_t RUN_TIME = 5'000'000'000; // 5秒 +// scheduledNanos += RUN_TIME; +// Priority new_pri = queue->updatePriority( +// task->getPriority(), +// RUN_TIME, +// scheduledNanos +// ); +// task->updatePriority(new_pri); +// +// // 4. 重新加入队列(此时level2已有时间记录) +// queue->addSplit(task); +// +// // 5. 再次添加新任务触发补偿 +// auto new_split = make_split(2, 300); +// queue->addSplit(new_split); +// +// // 验证补偿逻辑 +// int64_t level2_time = queue->get_level_scheduled_time(2); +// int64_t level0_time = queue->get_level0_target_time(); +// +// // 预期补偿 = (level0_time / 4) + 5秒 +// int64_t expected = (level0_time / 4) + RUN_TIME; +// +// // 允许10%误差 +// EXPECT_GT(level2_time, expected - expected/10); +// EXPECT_LT(level2_time, expected + expected/10); +//} +// +//TEST_F(MultilevelSplitQueueTest, PriorityUpdate) { +// auto split = make_split(0, 0); +// queue->addSplit(split); +// +// // 模拟运行30秒 +// auto split_ptr = std::static_pointer_cast<TestSplitRunner>(queue->pollSplit()); +// ASSERT_NE(split_ptr, nullptr); +// Priority new_pri = queue->updatePriority( +// split_ptr->getPriority(), +// 30'000'000'000LL, // 30秒 +// 0 +// ); +// +// // 验证升级到level3 +// EXPECT_EQ(new_pri.getLevel(), 3); +// EXPECT_GT(new_pri.getLevelPriority(), 30'000'000'000LL); +//} +// +//TEST_F(MultilevelSplitQueueTest, LevelSelection) { +// // 准备不同级别的任务 +// queue->addSplit(make_split(0, 100)); // level0 +// queue->addSplit(make_split(3, 50)); // level3 +// +// // 多次轮询触发时间补偿 +// for (int i = 0; i < 3; ++i) { +// auto task = queue->pollSplit(); +// queue->addSplit(task); +// } +// +// // 应优先选择level3 +// auto task = queue->pollSplit(); +// EXPECT_EQ(task->getPriority().getLevel(), 3); +//} +// +//TEST_F(MultilevelSplitQueueTest, Concurrency) { +// constexpr int THREAD_NUM = 4; +// constexpr int OPS = 1000; +// std::vector<std::thread> workers; +// +// for (int i = 0; i < THREAD_NUM; ++i) { +// workers.emplace_back([&] { +// for (int j = 0; j < OPS; ++j) { +// auto split = make_split(j % 5, j); +// queue->addSplit(split); +// if (auto task = queue->pollSplit()) { +// // 模拟处理任务 +// std::this_thread::sleep_for(std::chrono::microseconds(10)); +// } +// } +// }); +// } +// +// for (auto& t : workers) t.join(); +// SUCCEED(); // 验证无崩溃 +//} +// +//TEST_F(MultilevelSplitQueueTest, Interrupt) { +// std::atomic<bool> stopped{false}; +// std::thread consumer([&] { +// while (!stopped) { +// if (auto task = queue->pollSplit()) { +// // process task +// } +// } +// }); +// +// std::this_thread::sleep_for(std::chrono::milliseconds(10)); +// queue->interrupt(); +// stopped = true; +// consumer.join(); +//} +// +//} // namespace doris::vectorized \ No newline at end of file
diff --git a/be/test/vec/exec/executor/time_sharing/split_concurrency_controller_test.cpp b/be/test/vec/exec/executor/time_sharing/split_concurrency_controller_test.cpp new file mode 100644 index 0000000..58868a9 --- /dev/null +++ b/be/test/vec/exec/executor/time_sharing/split_concurrency_controller_test.cpp
@@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/executor/time_sharing/split_concurrency_controller.h" + +#include <gtest/gtest.h> + +#include <chrono> + +namespace doris::vectorized { + +using namespace std::chrono_literals; + +class SplitConcurrencyControllerTest : public testing::Test {}; + +TEST_F(SplitConcurrencyControllerTest, test_ramp_up) { + SplitConcurrencyController controller(1, std::chrono::seconds(1)); + + for (int i = 0; i < 10; ++i) { + controller.update(2'000'000'000, 0.0, i + 1); + EXPECT_EQ(controller.target_concurrency(), i + 2) << "Rampup failed at iteration " << i; + } +} + +TEST_F(SplitConcurrencyControllerTest, test_ramp_down) { + SplitConcurrencyController controller(10, std::chrono::seconds(1)); + + for (int i = 0; i < 9; ++i) { + controller.update(2'000'000'000, 1.0, 10 - i); + controller.split_finished(30'000'000'000, 1.0, 10 - i); + EXPECT_EQ(controller.target_concurrency(), 10 - i - 1) + << "Rampdown failed at iteration " << i; + } +} + +TEST_F(SplitConcurrencyControllerTest, test_rapid_adjust_for_quick_splits) { + SplitConcurrencyController controller(10, std::chrono::seconds(1)); + + // 测试快速递减 + for (int i = 0; i < 9; ++i) { + controller.update(200'000'000, 1.0, 10 - i); + controller.split_finished(100'000'000, 1.0, 10 - i); + EXPECT_EQ(controller.target_concurrency(), 10 - i - 1) + << "Rapid decrease failed at iteration " << i; + } + + // 重置状态后测试递增 + controller.update(30'000'000'000, 0.0, 1); + for (int i = 0; i < 10; ++i) { + controller.update(200'000'000, 0.0, i + 1); + controller.split_finished(100'000'000, 0.0, i + 1); + EXPECT_EQ(controller.target_concurrency(), i + 2) + << "Rapid increase failed at iteration " << i; + } +} + +} // namespace doris::vectorized \ No newline at end of file
diff --git a/be/test/vec/exec/executor/time_sharing/time_sharing_task_executor_test.cpp b/be/test/vec/exec/executor/time_sharing/time_sharing_task_executor_test.cpp new file mode 100644 index 0000000..104c626 --- /dev/null +++ b/be/test/vec/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -0,0 +1,1238 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "vec/exec/executor/time_sharing/time_sharing_task_executor.h" + +#include <gtest/gtest.h> + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <functional> +#include <future> +#include <mutex> +#include <random> +#include <thread> + +#include "vec/exec/executor/ticker.h" +#include "vec/exec/executor/time_sharing/time_sharing_task_handle.h" + +namespace doris { +namespace vectorized { + +class PhaseController { +public: + explicit PhaseController(int parties) : _parties(parties), _count(parties), _generation(0) {} + + int arrive_and_await() { + std::unique_lock<std::mutex> lock(_mutex); + int current_generation = _generation; + const int arrived_count = --_count; + + if (arrived_count == 0) { + // 重置计数器并进入下一代 + _count = _parties; + _generation++; + _cv.notify_all(); + } else { + // 等待直到进入下一代 + _cv.wait(lock, [this, current_generation] { + return _generation > current_generation || _count == 0; + }); + } + return _generation; + } + + void arrive_and_deregister() { + std::unique_lock<std::mutex> lock(_mutex); + _parties--; // 永久减少参与方数量 + const int arrived_count = --_count; + + // fprintf(stderr, "[PhaseController] 注销参与者,剩余参与方=%d\n", _parties); + + if (arrived_count == 0) { + // 立即触发代际更新 + _count = _parties; + _generation++; + _cv.notify_all(); + } else { + // 仅记录当前代际,不阻塞等待 + _cv.notify_all(); // 唤醒可能存在的等待线程 + } + } + + void register_party() { + std::lock_guard<std::mutex> lock(_mutex); + _parties++; + _count++; // 立即生效新参与者 + _cv.notify_all(); // 唤醒可能存在的等待线程 + fprintf(stderr, "[PhaseController] 注册新参与者,当前总参与方=%d\n", _parties); + } + + int parties() const { + std::lock_guard<std::mutex> lock(_mutex); + return _parties; + } + + int count() const { + std::lock_guard<std::mutex> lock(_mutex); + return _count; + } + + int generation() const { + std::lock_guard<std::mutex> lock(_mutex); + return _generation; + } + +private: + mutable std::mutex _mutex; + std::condition_variable _cv; + int _parties; + int _count; + int _generation; // 新增代际计数器 +}; + +// 线程安全的测试用Ticker +class TestingTicker final : public Ticker { +public: + TestingTicker() : time_(0) {} + + int64_t read() const override { return time_.load(std::memory_order_relaxed); } + + auto now() const { return time_.load(); } + + void increment(int64_t delta, std::chrono::nanoseconds unit) { + if (delta < 0) { + throw std::invalid_argument("delta is negative"); + } + + std::lock_guard<std::mutex> lock(mutex_); + time_ += unit.count() * delta; + } + +private: + std::atomic<int64_t> time_; + mutable std::mutex mutex_; +}; + +class TestingSplitRunner : public SplitRunner { +public: + TestingSplitRunner(std::string name, std::shared_ptr<TestingTicker> ticker, + std::shared_ptr<PhaseController> global_controller, + std::shared_ptr<PhaseController> begin_controller, + std::shared_ptr<PhaseController> end_controller, int required_phases, + int quanta_time_ms) + : _name(std::move(name)), + _ticker(ticker), + _global_controller(global_controller), + _begin_controller(begin_controller), + _end_controller(end_controller), + _required_phases(required_phases), + _quanta_time(quanta_time_ms), + _completed_phases(0), + _first_phase(-1), + _last_phase(-1), + _started(false) { + // _completion_future(_completion_promise.get_future()) { + _begin_controller->register_party(); + _end_controller->register_party(); + + if (_global_controller->parties() == 0) { + _global_controller->register_party(); + } + } + + Status init() override { return Status::OK(); } + + Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) override { + // fprintf(stderr, "[%zu] TestingSplitRunner::process_for()\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id())); + _started = true; + _ticker->increment(_quanta_time, std::chrono::milliseconds(1)); + // 创建新的promise/future对 + std::promise<void> phase_promise; + std::future<void> phase_future = phase_promise.get_future(); + + // Global phase synchronization + // fprintf(stderr, + // "[%zu] TestingSplitRunner::process_for() _global_controller.arrive_and_await()," + // " _global_controller.count(): %d, _global_controller.parties(): %d;\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id()), + // _global_controller->count(), _global_controller->parties()); + _global_controller->arrive_and_await(); + // fprintf(stderr, + // "[%zu] TestingSplitRunner::process_for() _global_controller.arrive_and_await() " + // "finished;\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id())); + + // Begin quanta phase + // static_cast<void>(_begin_controller); + int generation = _begin_controller->arrive_and_await(); + int expected = -1; + _first_phase.compare_exchange_weak(expected, generation - 1, std::memory_order_relaxed); + _last_phase.store(generation, std::memory_order_relaxed); + + // fprintf(stderr, + // "[%zu] TestingSplitRunner::process_for() _end_controller.arrive_and_await()," + // " _end_controller.count(): %d, _end_controller.parties(): %d;\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id()), _end_controller->count(), + // _end_controller->parties()); + + // End quanta phase + _end_controller->arrive_and_await(); + + // fprintf(stderr, + // "[%zu] TestingSplitRunner::process_for() _end_controller.arrive_and_await() " + // "finished;\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id())); + + if (++_completed_phases >= _required_phases) { + // fprintf(stderr, + // "[%zu] TestingSplitRunner::process_for() all completed: completed_phases=%d, " + // "required=%d, global_count=%d, global_parties=%d\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id()), + // _completed_phases.load(), _required_phases, _global_controller->count(), + // _global_controller->parties()); + _end_controller->arrive_and_deregister(); + _begin_controller->arrive_and_deregister(); + _global_controller->arrive_and_deregister(); + // 仅当任务完成时设置主promise + _completion_future.set_value(Void {}); + } + + // fprintf(stderr, + // "[%zu] TestingSplitRunner::process_for() _completed_phases: %d," + // " _required_phases: %d, _global_controller.count(): %d, " + // "_global_controller.parties(): %d;\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id()), _completed_phases.load(), + // _required_phases, _global_controller->count(), _global_controller->parties()); + + // 立即完成当前阶段的promise + // phase_promise.set_value(); + + return SharedListenableFuture<Void>::create_ready(); + } + + int completed_phases() const { return _completed_phases; } + + int first_phase() const { return _first_phase; } + + int last_phase() const { return _last_phase; } + + void close(const Status& status) override { + // Implementation needed + } + + std::string get_info() const override { + // Implementation needed + return ""; + } + + bool is_finished() override { + // return _completion_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready; + return _completion_future.is_ready(); + } + + Status finished_status() override { return _completion_future.get_status(); } + + bool is_started() const { return _started.load(); } + +private: + std::string _name; + std::shared_ptr<TestingTicker> _ticker; + std::shared_ptr<PhaseController> _global_controller; + std::shared_ptr<PhaseController> _begin_controller; + std::shared_ptr<PhaseController> _end_controller; + int _required_phases; + int _quanta_time; + + std::atomic<int> _completed_phases; + std::atomic<int> _first_phase; + std::atomic<int> _last_phase; + std::atomic<bool> _started; + // std::promise<void> _completion_promise; // 用于整体完成通知 + // std::shared_future<void> _completion_future; // 改为共享future + ListenableFuture<Void> _completion_future {}; +}; + +class TimeSharingTaskExecutorTest : public testing::Test { +protected: + void SetUp() override {} + + void TearDown() override {} + +private: + // 修改为模板函数以支持不同容器类型 + template <typename Container> + void assert_split_states(int end_index, const Container& splits) { + // 打印所有split的启动状态 + fprintf(stderr, "[验证] 所有split启动状态总览(共%zu个):\n", splits.size()); + for (int i = 0; i < splits.size(); ++i) { + fprintf(stderr, " → split[%d] 启动状态: %s\n", i, + splits[i]->is_started() ? "true" : "false"); + } + + // 验证end_index之前的split都已启动 + fprintf(stderr, "\n[验证] 详细检查0-%d号split应已启动:\n", end_index); + for (int i = 0; i <= end_index; ++i) { + bool actual = splits[i]->is_started(); + fprintf(stderr, " → split[%d] 预期启动=true,实际=%s\n", i, actual ? "true" : "false"); + EXPECT_TRUE(actual) << "Split " << i << " 应该已启动"; + } + + // 验证end_index之后的split未启动 + fprintf(stderr, "\n[验证] 详细检查%d-%zu号split应未启动:\n", end_index + 1, + splits.size() - 1); + for (int i = end_index + 1; i < splits.size(); ++i) { + bool actual = splits[i]->is_started(); + fprintf(stderr, " → split[%d] 预期启动=false,实际=%s\n", i, + actual ? "true" : "false"); + EXPECT_FALSE(actual) << "Split " << i << " 应该未启动"; + } + + fprintf(stderr, "[验证] 完整状态验证完成\n\n"); + } + + std::mt19937_64 _rng {std::random_device {}()}; +}; + +// 辅助函数创建测试任务 +//std::shared_ptr<TestingSplitRunner> create_testing_split( +// std::shared_ptr<TestingTicker> ticker, +// std::shared_ptr<PhaseController> begin, +// std::shared_ptr<PhaseController> end, +// int required_phases) { +// return std::make_shared<TestingSplitRunner>( +// "driver", ticker, +// std::make_shared<PhaseController>(0), // global_controller +// begin, +// end, +// required_phases, +// 0); +//} + +// 修改原有测试用例(示例) +TEST_F(TimeSharingTaskExecutorTest, TestTasksComplete) { + auto ticker = std::make_shared<TestingTicker>(); + // const auto stuck_threshold = std::chrono::minutes(10); + + TimeSharingTaskExecutor executor(4, 8, 3, 4, ticker); + ASSERT_TRUE(executor.init().ok()); + ASSERT_TRUE(executor.start().ok()); + + try { + ticker->increment(20, std::chrono::milliseconds(1)); + TaskId task_id("test_task"); + auto task_handle = TEST_TRY(executor.create_task( + task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1), std::nullopt)); + + auto begin_phase = std::make_shared<PhaseController>(1); + auto verification_complete = std::make_shared<PhaseController>(1); + + // 添加两个初始任务 + auto driver1 = std::make_shared<TestingSplitRunner>( + "driver1", ticker, + std::make_shared<PhaseController>(0), // global_controller + begin_phase, verification_complete, 10, 0); + auto driver2 = std::make_shared<TestingSplitRunner>( + "driver2", ticker, + std::make_shared<PhaseController>(0), // global_controller + begin_phase, verification_complete, 10, 0); + executor.enqueue_splits(task_handle, true, {driver1, driver2}); + + // 验证初始状态 + EXPECT_EQ(driver1->completed_phases(), 0); + EXPECT_EQ(driver2->completed_phases(), 0); + + // 触发第一阶段 + begin_phase->arrive_and_await(); + ticker->increment(60, std::chrono::seconds(1)); + + // 验证停滞任务检测 + // auto stuck_splits = executor.get_stuck_split_task_ids(stuck_threshold, [](auto&){ return true; }); + // EXPECT_TRUE(stuck_splits.empty()) << "应无停滞任务"; + // EXPECT_EQ(executor.get_runaway_split_count(), 0) << "失控任务计数应为0"; + + // 触发任务超时 + ticker->increment(600, std::chrono::seconds(1)); + // stuck_splits = executor.get_stuck_split_task_ids(stuck_threshold, [](auto&){ return true; }); + // EXPECT_EQ(stuck_splits.size(), 1) << "应检测到1个停滞任务"; + // EXPECT_EQ(*stuck_splits.begin(), task_id) << "停滞任务ID不匹配"; + // EXPECT_EQ(executor.get_runaway_split_count(), 2) << "应检测到2个失控任务"; + verification_complete->arrive_and_await(); + + // advance one phase and verify + begin_phase->arrive_and_await(); + EXPECT_EQ(driver1->completed_phases(), 1); + EXPECT_EQ(driver2->completed_phases(), 1); + verification_complete->arrive_and_await(); + + // add one more job + auto driver3 = std::make_shared<TestingSplitRunner>( + "driver3", ticker, + std::make_shared<PhaseController>(0), // global_controller + begin_phase, verification_complete, 10, 0); + executor.enqueue_splits(task_handle, false, {driver3}); + // advance one phase and verify + begin_phase->arrive_and_await(); + EXPECT_EQ(driver1->completed_phases(), 2); + EXPECT_EQ(driver2->completed_phases(), 2); + EXPECT_EQ(driver3->completed_phases(), 0); + // future1.get(1, SECONDS); + // future2.get(1, SECONDS); + verification_complete->arrive_and_await(); + + // advance to the end of the first two task and verify + begin_phase->arrive_and_await(); + for (int i = 0; i < 7; i++) { + verification_complete->arrive_and_await(); + begin_phase->arrive_and_await(); + EXPECT_EQ(begin_phase->generation(), verification_complete->generation() + 1); + } + EXPECT_EQ(driver1->completed_phases(), 10); + EXPECT_EQ(driver2->completed_phases(), 10); + EXPECT_EQ(driver3->completed_phases(), 8); + // future3.get(1, SECONDS); + verification_complete->arrive_and_await(); + + // advance two more times and verify + begin_phase->arrive_and_await(); + verification_complete->arrive_and_await(); + begin_phase->arrive_and_await(); + EXPECT_EQ(driver1->completed_phases(), 10); + EXPECT_EQ(driver2->completed_phases(), 10); + EXPECT_EQ(driver3->completed_phases(), 10); + // future3.get(1, SECONDS); + verification_complete->arrive_and_await(); + // + EXPECT_EQ(driver1->first_phase(), 0); + EXPECT_EQ(driver2->first_phase(), 0); + EXPECT_EQ(driver3->first_phase(), 2); + EXPECT_EQ(driver1->last_phase(), 10); + EXPECT_EQ(driver2->last_phase(), 10); + EXPECT_EQ(driver3->last_phase(), 12); + + // 清理后验证无滞留任务 + ticker->increment(610, std::chrono::seconds(1)); + // stuck_splits = executor.get_stuck_split_task_ids(stuck_threshold, [](auto&){ return true; }); + // EXPECT_TRUE(stuck_splits.empty()) << "最终应无停滞任务"; + // EXPECT_EQ(executor.get_runaway_split_count(), 0) << "最终失控任务计数应为0"; + } catch (...) { + executor.stop(); + throw; + } + executor.stop(); +} + +TEST_F(TimeSharingTaskExecutorTest, TestQuantaFairness) { + constexpr int TOTAL_PHASES = 11; + std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>(); + + TimeSharingTaskExecutor executor(1, 2, 3, 4, ticker); + ASSERT_TRUE(executor.init().ok()); + ASSERT_TRUE(executor.start().ok()); + + ticker->increment(20, std::chrono::milliseconds(1)); + // 创建两个独立的任务句柄 + TaskId short_task_id("short_quanta"); + TaskId long_task_id("long_quanta"); + auto short_handle = TEST_TRY(executor.create_task( + short_task_id, []() { return 0.0; }, // 优先级函数 + 10, // 初始并发数 + std::chrono::milliseconds(1), std::nullopt)); + + auto long_handle = TEST_TRY(executor.create_task( + long_task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1), std::nullopt)); + + auto end_controller = std::make_shared<PhaseController>(0); + + auto short_split = std::make_shared<TestingSplitRunner>( + "short", ticker, std::make_shared<PhaseController>(0), + std::make_shared<PhaseController>(0), end_controller, TOTAL_PHASES, 10); + + auto long_split = std::make_shared<TestingSplitRunner>( + "long", ticker, std::make_shared<PhaseController>(0), + std::make_shared<PhaseController>(0), end_controller, TOTAL_PHASES, 20); + + // 添加任务到对应的句柄 + executor.enqueue_splits(short_handle, true, {short_split}); + executor.enqueue_splits(long_handle, true, {long_split}); + + // 修改主线程同步逻辑 + for (int i = 0; i < TOTAL_PHASES; ++i) { + end_controller->arrive_and_await(); + } + + // fprintf(stderr, "验证阶段数\n"); + + // 验证阶段数 + EXPECT_GE(short_split->completed_phases(), 7); + EXPECT_LE(short_split->completed_phases(), 8); + EXPECT_GE(long_split->completed_phases(), 3); + EXPECT_LE(long_split->completed_phases(), 4); + end_controller->arrive_and_deregister(); + executor.stop(); +} + +TEST_F(TimeSharingTaskExecutorTest, TestLevelMovement) { + // fprintf(stderr, "[%zu] TestLevelMovement\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id())); + + std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>(); + TimeSharingTaskExecutor executor(2, 2, 3, 4, ticker); + ASSERT_TRUE(executor.init().ok()); + ASSERT_TRUE(executor.start().ok()); + + try { + // 初始化时间 + ticker->increment(20, std::chrono::milliseconds(1)); + + // 创建任务句柄 + TaskId task_id("test_task"); + auto task_handle = + std::static_pointer_cast<TimeSharingTaskHandle>(TEST_TRY(executor.create_task( + task_id, []() { return 0.0; }, // 优先级函数 + 10, // 初始并发数 + std::chrono::milliseconds(1), std::nullopt))); + + // 同步控制器 + auto global_controller = std::make_shared<PhaseController>(3); // 2个执行线程 + 主线程 + const int quanta_time_ms = 500; + const int phases_per_second = 1000 / quanta_time_ms; + const int total_phases = + MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS + [MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS.size() - 1] * + phases_per_second; // 假设最后一级阈值 + + // fprintf(stderr, "total_phases: %d\n", total_phases); + // 创建测试任务 + auto driver1 = std::make_shared<TestingSplitRunner>( + "driver1", ticker, global_controller, std::make_shared<PhaseController>(0), + std::make_shared<PhaseController>(0), total_phases, quanta_time_ms); + auto driver2 = std::make_shared<TestingSplitRunner>( + "driver2", ticker, global_controller, std::make_shared<PhaseController>(0), + std::make_shared<PhaseController>(0), total_phases, quanta_time_ms); + + // 添加任务 + executor.enqueue_splits(task_handle, false, {driver1, driver2}); + + int completed_phases = 0; + for (int i = 0; i < MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS.size() - 1; ++i) { + const int target_seconds = MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i + 1]; + while ((completed_phases / phases_per_second) < target_seconds) { + // fprintf(stderr, "completed_phases: %d, phases_per_second: %d, target_seconds: %d\n", + // completed_phases, phases_per_second, target_seconds); + // fprintf(stderr, + // "[%zu] TestLevelMovement _global_controller.arrive_and_await()" + // " _global_controller.count(): %d, _global_controller.parties(): %d;\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id()), + // global_controller->count(), global_controller->parties()); + global_controller->arrive_and_await(); + // fprintf(stderr, + // "[%zu] TestLevelMovement _global_controller.arrive_and_await() finished;\n", + // std::hash<std::thread::id> {}(std::this_thread::get_id())); + ++completed_phases; + } + + // 验证优先级级别 + EXPECT_EQ(task_handle->priority().level(), i + 1); + } + + global_controller->arrive_and_deregister(); + } catch (...) { + executor.stop(); + throw; + } + executor.stop(); +} + +TEST_F(TimeSharingTaskExecutorTest, TestLevelMultipliers) { + constexpr int TASK_THREADS = 6; + constexpr int CONCURRENCY = 3; + + std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>(); + std::shared_ptr<MultilevelSplitQueue> split_queue = + std::make_shared<MultilevelSplitQueue>(2); // 初始化多级队列 + TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 3, 4, ticker, + std::chrono::milliseconds(60000), split_queue); + ASSERT_TRUE(executor.init().ok()); + ASSERT_TRUE(executor.start().ok()); + + try { + ticker->increment(20, std::chrono::milliseconds(1)); + + for (int i = 0; + i < (sizeof(MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS) / sizeof(int) - 1); ++i) { + // 创建三个任务句柄 + TaskId task_id1("test1"), task_id2("test2"), task_id3("test3"); + auto task_handle1 = TEST_TRY(executor.create_task( + task_id1, []() { return 0.0; }, 10, std::chrono::milliseconds(1), + std::nullopt)); + auto task_handle2 = TEST_TRY(executor.create_task( + task_id2, []() { return 0.0; }, 10, std::chrono::milliseconds(1), + std::nullopt)); + auto task_handle3 = TEST_TRY(executor.create_task( + task_id3, []() { return 0.0; }, 10, std::chrono::milliseconds(1), + std::nullopt)); + + // 移动任务0到下一级别 + auto task0_job = std::make_shared<TestingSplitRunner>( + "task0", ticker, std::make_shared<PhaseController>(0), + std::make_shared<PhaseController>(0), std::make_shared<PhaseController>(0), 1, + MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i + 1] * 1000); + executor.enqueue_splits(task_handle1, false, {task0_job}); + + // 移动任务1和2到当前级别 + auto task1_job = std::make_shared<TestingSplitRunner>( + "task1", ticker, std::make_shared<PhaseController>(0), + std::make_shared<PhaseController>(0), std::make_shared<PhaseController>(0), 1, + MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i] * 1000); + auto task2_job = std::make_shared<TestingSplitRunner>( + "task2", ticker, std::make_shared<PhaseController>(0), + std::make_shared<PhaseController>(0), std::make_shared<PhaseController>(0), 1, + MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i] * 1000); + executor.enqueue_splits(task_handle2, false, {task1_job}); + executor.enqueue_splits(task_handle3, false, {task2_job}); + + // 等待任务完成 + while (!task0_job->is_finished() || !task1_job->is_finished() || + !task2_job->is_finished()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + // fprintf(stderr, "start new task\n"); + // 启动新任务 + auto global_controller = std::make_shared<PhaseController>(7); // 6个执行线程 + 主线程 + const int phases_for_next_level = MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i + 1] - + MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i]; + std::array<std::shared_ptr<TestingSplitRunner>, 6> drivers; + + for (int j = 0; j < 6; ++j) { + drivers[j] = std::make_shared<TestingSplitRunner>( + "driver" + std::to_string(j), ticker, global_controller, + std::make_shared<PhaseController>(0), std::make_shared<PhaseController>(0), + phases_for_next_level, 1000); + } + + executor.enqueue_splits(task_handle1, false, {drivers[0], drivers[1]}); + executor.enqueue_splits(task_handle2, false, {drivers[2], drivers[3]}); + executor.enqueue_splits(task_handle3, false, {drivers[4], drivers[5]}); + + // 收集初始阶段数 + int lower_level_start = 0, higher_level_start = 0; + for (int j = 2; j < 6; ++j) lower_level_start += drivers[j]->completed_phases(); + for (int j = 0; j < 2; ++j) higher_level_start += drivers[j]->completed_phases(); + + // 运行所有任务 + while (std::any_of(drivers.begin(), drivers.end(), + [](auto& d) { return !d->is_finished(); })) { + global_controller->arrive_and_await(); + + int lower_level_end = 0, higher_level_end = 0; + for (int j = 2; j < 6; ++j) lower_level_end += drivers[j]->completed_phases(); + for (int j = 0; j < 2; ++j) higher_level_end += drivers[j]->completed_phases(); + + int lower_level_time = lower_level_end - lower_level_start; + int higher_level_time = higher_level_end - higher_level_start; + + if (higher_level_time > 20) { + EXPECT_GT(lower_level_time, higher_level_time * 2 - 10); + EXPECT_LT(higher_level_time, lower_level_time * 2 + 10); + } + } + + global_controller->arrive_and_deregister(); + static_cast<void>(executor.remove_task(task_handle1)); + static_cast<void>(executor.remove_task(task_handle2)); + static_cast<void>(executor.remove_task(task_handle3)); + } + } catch (...) { + executor.stop(); + throw; + } + executor.stop(); +} + +TEST_F(TimeSharingTaskExecutorTest, TestTaskHandle) { + std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>(); + TimeSharingTaskExecutor executor(4, 8, 3, 4, ticker); + ASSERT_TRUE(executor.init().ok()); + ASSERT_TRUE(executor.start().ok()); + + try { + // 创建任务句柄 + TaskId task_id("test_task"); + auto task_handle = + std::static_pointer_cast<TimeSharingTaskHandle>(TEST_TRY(executor.create_task( + task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1), + std::nullopt))); + + // 准备同步控制器 + auto begin_phase = std::make_shared<PhaseController>(1); + auto verification_complete = std::make_shared<PhaseController>(1); + + // 创建测试任务 + auto driver1 = std::make_shared<TestingSplitRunner>( + "driver1", ticker, + std::make_shared<PhaseController>(0), // global_controller + begin_phase, verification_complete, + 10, // required_phases + 0); // quanta_time + + auto driver2 = std::make_shared<TestingSplitRunner>( + "driver2", ticker, + std::make_shared<PhaseController>(0), // global_controller + begin_phase, verification_complete, + 10, // required_phases + 0); // quanta_time + + // // 强制入队split(不立即执行) + // executor.enqueue_splits(task_handle, true, {driver1}); + // EXPECT_EQ(task_handle->get_running_leaf_splits(), 0) + // << "强制入队后应立即运行的任务数应为0"; + // + // // 正常入队split + // executor.enqueue_splits(task_handle, false, {driver2}); + // EXPECT_EQ(task_handle->get_running_leaf_splits(), 1) + // << "正常入队后应立即运行的任务数应为1"; + + // // 正常入队split + // executor.enqueue_splits(task_handle, false, {driver2}); + // EXPECT_EQ(task_handle->get_running_leaf_splits(), 2) + // << "正常入队后应立即运行的任务数应为2"; + + executor.enqueue_splits(task_handle, true, {driver1}); + EXPECT_EQ(task_handle->running_leaf_splits(), 0) << "强制入队后应立即运行的任务数应为0"; + + // 正常入队split + executor.enqueue_splits(task_handle, false, {driver2}); + EXPECT_EQ(task_handle->running_leaf_splits(), 1) << "正常入队后应立即运行的任务数应为1"; + + // 释放同步锁让任务继续执行 + begin_phase->arrive_and_deregister(); + verification_complete->arrive_and_deregister(); + + // 等待任务完成 + while (!driver1->is_finished() || !driver2->is_finished()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } catch (...) { + executor.stop(); + throw; + } + executor.stop(); +} + +// 辅助函数实现(与之前测试保持一致) +void wait_until_splits_start(const std::vector<std::shared_ptr<TestingSplitRunner>>& splits) { + constexpr auto TIMEOUT = std::chrono::seconds(30); + auto start = std::chrono::steady_clock::now(); + + while (std::any_of(splits.begin(), splits.end(), + [](const auto& split) { return !split->is_started(); })) { + if (std::chrono::steady_clock::now() - start > TIMEOUT) { + throw std::runtime_error("等待split启动超时"); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} + +TEST_F(TimeSharingTaskExecutorTest, TestMinMaxDriversPerTask) { + constexpr int MAX_DRIVERS_PER_TASK = 2; + constexpr int TASK_THREADS = 4; + constexpr int CONCURRENCY = 16; + constexpr int BATCH_COUNT = 4; + constexpr int SPLITS_PER_BATCH = 2; + + std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>(); + std::shared_ptr<MultilevelSplitQueue> split_queue = + std::make_shared<MultilevelSplitQueue>(2); // 初始化多级队列 + TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 1, MAX_DRIVERS_PER_TASK, ticker, + std::chrono::milliseconds(60000), split_queue); + ASSERT_TRUE(executor.init().ok()); + ASSERT_TRUE(executor.start().ok()); + + try { + TaskId task_id("test_task"); + auto task_handle = TEST_TRY(executor.create_task( + task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1), std::nullopt)); + + std::array<std::shared_ptr<TestingSplitRunner>, BATCH_COUNT * SPLITS_PER_BATCH> splits; + std::array<std::shared_ptr<PhaseController>, BATCH_COUNT> batch_controllers; + + // 准备4个批次的split,每批2个 + for (int batch = 0; batch < BATCH_COUNT; ++batch) { + batch_controllers[batch] = std::make_shared<PhaseController>(0); + batch_controllers[batch]->register_party(); + auto& controller = batch_controllers[batch]; + + for (int i = 0; i < SPLITS_PER_BATCH; ++i) { + splits[batch * SPLITS_PER_BATCH + i] = std::make_shared<TestingSplitRunner>( + "batch" + std::to_string(batch) + "_split" + std::to_string(i), ticker, + std::make_shared<PhaseController>(0), // global_controller + std::make_shared<PhaseController>(0), // begin_controller + controller, // end_controller + 1, // required_phases + 0); // quanta_time + } + + executor.enqueue_splits( + task_handle, false, + {splits[batch * SPLITS_PER_BATCH], splits[batch * SPLITS_PER_BATCH + 1]}); + } + + // fprintf(stderr, "wait and validation.\n"); + // 验证分批执行逻辑 + for (int current_batch = 0; current_batch < BATCH_COUNT; ++current_batch) { + // 等待当前批次split启动 + // fprintf(stderr, "wait_until_splits_start batch: %d\n", current_batch); + wait_until_splits_start({splits[current_batch * SPLITS_PER_BATCH], + splits[current_batch * SPLITS_PER_BATCH + 1]}); + // fprintf(stderr, "wait_until_splits_start batch finished: %d\n", current_batch); + + // 替换原有的验证逻辑 + assert_split_states(current_batch * SPLITS_PER_BATCH + 1, splits); + + // 完成当前批次 + batch_controllers[current_batch]->arrive_and_deregister(); + } + + // 验证所有split最终完成 + for (auto& split : splits) { + while (!split->is_finished()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } catch (...) { + executor.stop(); + throw; + } + executor.stop(); +} + +TEST_F(TimeSharingTaskExecutorTest, TestUserSpecifiedMaxDriversPerTask) { + constexpr int TASK_THREADS = 4; + constexpr int CONCURRENCY = 16; + constexpr int BATCH_COUNT = 4; + + std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>(); + std::shared_ptr<MultilevelSplitQueue> split_queue = std::make_shared<MultilevelSplitQueue>(2); + TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 2, 4, ticker, + std::chrono::milliseconds(60000), split_queue); + ASSERT_TRUE(executor.init().ok()); + ASSERT_TRUE(executor.start().ok()); + + try { + // 创建任务并覆盖最大驱动器数为1 + TaskId task_id("user_specified_task"); + auto task_handle = TEST_TRY(executor.create_task( + task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1), + std::optional<int>(1))); // 显式指定最大驱动器数 + + std::array<std::shared_ptr<TestingSplitRunner>, BATCH_COUNT> splits; + std::array<std::shared_ptr<PhaseController>, BATCH_COUNT> batch_controllers; + + // 准备4个批次的split + for (int batch = 0; batch < BATCH_COUNT; ++batch) { + batch_controllers[batch] = std::make_shared<PhaseController>(0); + batch_controllers[batch]->register_party(); + + splits[batch] = std::make_shared<TestingSplitRunner>( + "batch" + std::to_string(batch), ticker, + std::make_shared<PhaseController>(0), // global_controller + std::make_shared<PhaseController>(0), // begin_controller + batch_controllers[batch], // end_controller + 1, // required_phases + 0); // quanta_time + + executor.enqueue_splits(task_handle, false, {splits[batch]}); + } + + // 验证分批执行逻辑 + for (int current_batch = 0; current_batch < BATCH_COUNT; ++current_batch) { + // 等待当前批次split启动 + wait_until_splits_start({splits[current_batch]}); + + // 验证当前批次状态 + assert_split_states(current_batch, splits); + + // 完成当前批次 + batch_controllers[current_batch]->arrive_and_deregister(); + } + + // 验证所有split最终完成 + for (auto& split : splits) { + while (!split->is_finished()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } catch (...) { + executor.stop(); + throw; + } + executor.stop(); +} + +TEST_F(TimeSharingTaskExecutorTest, TestMinDriversPerTaskWhenTargetConcurrencyIncreases) { + constexpr int TASK_THREADS = 4; + constexpr int CONCURRENCY = 1; + constexpr int BATCH_COUNT = 3; + + std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>(); + std::shared_ptr<MultilevelSplitQueue> split_queue = std::make_shared<MultilevelSplitQueue>(2); + TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 2, 2, ticker, + std::chrono::milliseconds(60000), split_queue); + ASSERT_TRUE(executor.init().ok()); + ASSERT_TRUE(executor.start().ok()); + + try { + // 创建任务并指定最小并发数 + TaskId task_id("test_query"); + auto task_handle = TEST_TRY(executor.create_task( + task_id, []() { return 0.0; }, // 确保缓冲区未充分利用 + 1, // 初始并发数 + std::chrono::milliseconds(1), + std::optional<int>(2))); // 显式指定最小并发数 + + std::array<std::shared_ptr<TestingSplitRunner>, BATCH_COUNT> splits; + std::array<std::shared_ptr<PhaseController>, BATCH_COUNT> batch_controllers; + + // 创建3个split + for (int batch = 0; batch < BATCH_COUNT; ++batch) { + batch_controllers[batch] = std::make_shared<PhaseController>(0); + batch_controllers[batch]->register_party(); + + splits[batch] = std::make_shared<TestingSplitRunner>( + "split" + std::to_string(batch), ticker, + std::make_shared<PhaseController>(0), // global_controller + std::make_shared<PhaseController>(0), // begin_controller + batch_controllers[batch], // end_controller + 1, // required_phases + 0); // quanta_time + } + + // 批量添加所有split + executor.enqueue_splits(task_handle, false, {splits[0], splits[1], splits[2]}); + + // 等待第一个split启动 + wait_until_splits_start({splits[0]}); + + // 验证只有第一个split启动 + assert_split_states(0, splits); + + // 完成第一个split(应触发并发数调整) + batch_controllers[0]->arrive_and_deregister(); + + // 等待剩余split启动 + wait_until_splits_start({splits[1], splits[2]}); + + // 验证所有split已启动 + assert_split_states(2, splits); + + // 完成剩余split + for (int batch = 1; batch < BATCH_COUNT; ++batch) { + batch_controllers[batch]->arrive_and_deregister(); + } + + // 验证所有split完成 + for (auto& split : splits) { + while (!split->is_finished()) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + } catch (...) { + executor.stop(); + throw; + } + executor.stop(); +} + +TEST_F(TimeSharingTaskExecutorTest, TestLeafSplitsSize) {} + +//TEST_F(TimeSharingTaskExecutorTest, TaskRotation) { +// constexpr int TASK_THREADS = 4; +// constexpr int CONCURRENCY = 1; +// constexpr int BATCH_COUNT = 3; +// +// std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>(); +// std::shared_ptr<MultilevelSplitQueue> split_queue = std::make_shared<MultilevelSplitQueue>(2); +// +// TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 2, 2, ticker, +// std::chrono::milliseconds(60000), split_queue); +// ASSERT_TRUE(executor.init().ok()); +// ASSERT_TRUE(executor.start().ok()); +// // 创建三个任务 +// auto task1 = std::make_shared<TimeSharingTaskHandle>( +// "task1", executor->_waiting_splits, []() { return 0.5; }, 1, +// std::chrono::milliseconds(100), 2); +// auto task2 = std::make_shared<TimeSharingTaskHandle>( +// "task2", executor->_waiting_splits, []() { return 0.5; }, 1, +// std::chrono::milliseconds(100), 2); +// auto task3 = std::make_shared<TimeSharingTaskHandle>( +// "task3", executor->_waiting_splits, []() { return 0.5; }, 1, +// std::chrono::milliseconds(100), 2); +// +// // 添加任务到执行器 +// executor->add_task("task1", task1); +// executor->add_task("task2", task2); +// executor->add_task("task3", task3); +// +// // 添加分片到任务 +// auto split1 = std::make_shared<MockSplitRunner>(); +// auto split2 = std::make_shared<MockSplitRunner>(); +// auto split3 = std::make_shared<MockSplitRunner>(); +// task1->enqueue_split(split1); +// task2->enqueue_split(split2); +// task3->enqueue_split(split3); +// +// // 测试任务轮转 +// { +// std::lock_guard<std::mutex> guard(executor->_mutex); +// +// // 第一次轮询,应该返回 task1 的分片 +// auto result1 = executor->_poll_next_split_worker(guard); +// EXPECT_EQ(result1, split1); +// +// // 第二次轮询,应该返回 task2 的分片 +// auto result2 = executor->_poll_next_split_worker(guard); +// EXPECT_EQ(result2, split2); +// +// // 第三次轮询,应该返回 task3 的分片 +// auto result3 = executor->_poll_next_split_worker(guard); +// EXPECT_EQ(result3, split3); +// +// // 第四次轮询,应该再次返回 task1 的分片(轮转) +// auto result4 = executor->_poll_next_split_worker(guard); +// EXPECT_EQ(result4, split1); +// } +// +// // 测试任务轮转时跳过没有分片的任务 +// { +// std::lock_guard<std::mutex> guard(executor->_mutex); +// +// // 清空 task2 的分片 +// task2->set_running_leaf_splits(2); +// +// // 第一次轮询,应该返回 task3 的分片(跳过 task2) +// auto result1 = executor->_poll_next_split_worker(guard); +// EXPECT_EQ(result1, split3); +// +// // 第二次轮询,应该返回 task1 的分片(跳过 task2) +// auto result2 = executor->_poll_next_split_worker(guard); +// EXPECT_EQ(result2, split1); +// } +//} + +class PhaseControllerTest : public testing::Test { +protected: + const int THREAD_COUNT = 4; + std::shared_ptr<PhaseController> controller; + + void SetUp() override { controller = std::make_shared<PhaseController>(THREAD_COUNT); } +}; + +// 基础同步测试 +TEST_F(PhaseControllerTest, BasicSynchronization) { + std::atomic<int> ready {0}; + std::vector<std::thread> threads; + const int GENERATIONS = 3; + + for (int g = 0; g < GENERATIONS; ++g) { + for (int i = 0; i < THREAD_COUNT; ++i) { + threads.emplace_back([&, g] { + // 同步线程启动 + if (ready.fetch_add(1) == THREAD_COUNT - 1) { + ready.store(0); + } else { + while (ready.load() != 0) + ; + } + + int gen = controller->arrive_and_await(); + EXPECT_EQ(gen, g + 1) << "代际计数错误"; + }); + } + + for (auto& t : threads) t.join(); + threads.clear(); + + EXPECT_EQ(controller->generation(), g + 1); + EXPECT_EQ(controller->count(), THREAD_COUNT) << "每代应重置计数器"; + } +} + +// 动态参与者管理测试 +TEST_F(PhaseControllerTest, DynamicRegistration) { + constexpr int NEW_PARTIES = 2; + std::atomic<int> completed {0}; + std::vector<std::thread> threads; + std::mutex cv_mtx; + std::condition_variable cv; + + // 修改初始参与方数(包含主线程) + controller = std::make_shared<PhaseController>(THREAD_COUNT + 1); + + // 初始参与者 (4 threads + 主线程) + for (int i = 0; i < THREAD_COUNT; ++i) { + threads.emplace_back([&] { + // 第一阶段同步 + controller->arrive_and_await(); + // fprintf(stderr, "[初始线程 %d] 第一阶段完成\n", idx); + + // 第二阶段同步 + controller->arrive_and_await(); + // fprintf(stderr, "[初始线程 %d] 第二阶段完成\n", idx); + + completed++; + }); + } + + // 主线程第一阶段同步 + // fprintf(stderr, "主线程开始同步第一阶段\n"); + int main_gen = controller->arrive_and_await(); + fprintf(stderr, "第一阶段完成,当前代际=%d\n", main_gen); + + // 新注册参与者 (2 threads) + std::vector<std::thread> new_threads; + for (int i = 0; i < NEW_PARTIES; ++i) { + new_threads.emplace_back([&] { + { + std::lock_guard<std::mutex> lk(cv_mtx); + controller->register_party(); + // fprintf(stderr, "[新线程 %d] 注册完成,当前参与方=%d\n", my_id, + // controller->parties()); + } + cv.notify_one(); + + // 只参与第二阶段同步 + controller->arrive_and_await(); + completed++; + // fprintf(stderr, "[新线程 %d] 完成\n", my_id); + }); + } + + // 等待新线程完成注册 + { + std::unique_lock<std::mutex> lk(cv_mtx); + cv.wait(lk, [&] { return controller->parties() == THREAD_COUNT + 1 + NEW_PARTIES; }); + } + + // 主线程触发第二阶段同步 + // fprintf(stderr, "主线程开始同步第二阶段\n"); + main_gen = controller->arrive_and_await(); + // fprintf(stderr, "第二阶段完成,当前代际=%d\n", main_gen); + + // 等待所有线程完成 + for (auto& t : threads) t.join(); + for (auto& t : new_threads) t.join(); + + // 验证结果 + // fprintf(stderr, "总完成数=%d (预期=%d)\n", completed.load(), THREAD_COUNT + NEW_PARTIES); + // fprintf(stderr, "当前参与方=%d (预期=%d)\n", controller->parties(), + // THREAD_COUNT + 1 + NEW_PARTIES); + + EXPECT_EQ(completed.load(), THREAD_COUNT + NEW_PARTIES); + EXPECT_EQ(controller->parties(), THREAD_COUNT + 1 + NEW_PARTIES); +} + +// // 注销参与者测试 +// TEST_F(PhaseControllerTest, DeregistrationTest) { +// std::vector<std::thread> threads; + +// // 第一个线程注销 +// threads.emplace_back([&] { +// controller->arrive_and_deregister(); +// EXPECT_EQ(controller->parties(), THREAD_COUNT - 1); +// }); + +// // 其他线程正常完成 +// for (int i = 1; i < THREAD_COUNT; ++i) { +// threads.emplace_back([&] { +// controller->arrive_and_await(); +// }); +// } + +// for (auto& t : threads) t.join(); + +// EXPECT_EQ(controller->parties(), THREAD_COUNT - 1); +// EXPECT_EQ(controller->count(), THREAD_COUNT - 1) << "注销后计数器应更新"; +// } + +// // 混合操作压力测试 +// TEST_F(PhaseControllerTest, ConcurrencyStressTest) { +// const int TOTAL_OPERATIONS = 1000; +// std::vector<std::thread> threads; +// std::atomic<int> ops {0}; + +// auto worker = [&] { +// std::random_device rd; +// std::mt19937 gen(rd()); +// std::uniform_int_distribution<> dist(0, 2); + +// while (ops.fetch_add(1) < TOTAL_OPERATIONS) { +// switch (dist(gen)) { +// case 0: +// controller->arrive_and_await(); +// break; +// case 1: +// controller->register_party(); +// break; +// case 2: +// if (controller->parties() > 0) { +// controller->arrive_and_deregister(); +// } +// break; +// } +// } +// }; + +// // 启动工作线程 +// for (int i = 0; i < 4; ++i) { +// threads.emplace_back(worker); +// } + +// // 验证最终状态一致性 +// for (auto& t : threads) t.join(); + +// EXPECT_GE(controller->parties(), 0) << "参与者数不应为负"; +// EXPECT_GE(controller->count(), 0) << "计数器不应为负"; +// EXPECT_GE(controller->generation(), 0) << "代际计数不应回退"; +// } + +// // 边界条件测试 +// TEST_F(PhaseControllerTest, EdgeCases) { +// // 测试空控制器 +// auto empty_controller = std::make_shared<PhaseController>(0); +// EXPECT_EQ(empty_controller->arrive_and_await(), 0); + +// // 测试单参与者 +// auto single_controller = std::make_shared<PhaseController>(1); +// std::thread t([&] { +// EXPECT_EQ(single_controller->arrive_and_await(), 1); +// }); +// t.join(); +// EXPECT_EQ(single_controller->generation(), 1); + +// // 测试多次注销 +// auto controller = std::make_shared<PhaseController>(2); +// controller->arrive_and_deregister(); +// controller->arrive_and_deregister(); +// EXPECT_EQ(controller->parties(), 0); +// } + +} // namespace vectorized +} // namespace doris