[feature](executor) Add time sharing executor framework.
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 841d509..0a6c35e 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -737,6 +737,16 @@
         std::forward<T>(res).value();            \
     });
 
+#define TEST_TRY(stmt)                                                                          \
+    ({                                                                                          \
+        auto&& res = (stmt);                                                                    \
+        using T = std::decay_t<decltype(res)>;                                                  \
+        if (!res.has_value()) [[unlikely]] {                                                    \
+            ASSERT_TRUE(res.has_value()) << "Expected success, but got error: " << res.error(); \
+        }                                                                                       \
+        std::forward<T>(res).value();                                                           \
+    })
+
 } // namespace doris
 
 // specify formatter for Status
diff --git a/be/src/vec/exec/executor/batch_split_scheduler.h b/be/src/vec/exec/executor/batch_split_scheduler.h
new file mode 100644
index 0000000..c614428
--- /dev/null
+++ b/be/src/vec/exec/executor/batch_split_scheduler.h
@@ -0,0 +1,142 @@
+//#pragma once
+//
+//#include <atomic>
+//#include <memory>
+//#include <mutex>
+//#include <condition_variable>
+//#include <thread>
+//#include <queue>
+//
+//#include "common/status.h"
+//#include "vec/exec/scan/split_source_connector.h"
+//#include "vec/exec/scan/vfile_scanner.h"
+//#include "runtime/runtime_state.h"
+//
+//namespace doris::vectorized {
+//
+//// BatchSplitScheduler 负责在独立线程中获取文件分片并动态创建 VFileScanner
+//class BatchSplitScheduler {
+//public:
+//    BatchSplitScheduler(std::shared_ptr<SplitSourceConnector> split_source,
+//                       RuntimeState* state,
+//                       const TFileScanNode& scan_node)
+//        : _split_source(split_source)
+//        , _state(state)
+//        , _scan_node(scan_node)
+//        , _running(false)
+//        , _stop(false) {}
+//
+//    ~BatchSplitScheduler() {
+//        stop();
+//    }
+//
+//    Status start() {
+//        if (_running) {
+//            return Status::OK();
+//        }
+//        _running = true;
+//        _stop = false;
+//        _fetch_thread = std::thread(&BatchSplitScheduler::_fetch_loop, this);
+//        return Status::OK();
+//    }
+//
+//    void stop() {
+//        if (!_running) {
+//            return;
+//        }
+//        {
+//            std::lock_guard<std::mutex> l(_lock);
+//            _stop = true;
+//            _cv.notify_one();
+//        }
+//        if (_fetch_thread.joinable()) {
+//            _fetch_thread.join();
+//        }
+//        _running = false;
+//    }
+//
+//    // 获取一个可用的 scanner
+//    Status get_next_scanner(std::shared_ptr<VFileScanner>* scanner) {
+//        std::unique_lock<std::mutex> l(_lock);
+//        while (_scanners.empty() && !_stop && _running) {
+//            _cv.wait(l);
+//        }
+//
+//        if (_stop || !_running) {
+//            return Status::Cancelled("BatchSplitScheduler stopped");
+//        }
+//
+//        if (!_scanners.empty()) {
+//            *scanner = _scanners.front();
+//            _scanners.pop();
+//            return Status::OK();
+//        }
+//
+//        return Status::EndOfFile("No more scanners");
+//    }
+//
+//    // 通知当前 scanner 已完成,可以获取新的分片
+//    void notify_scanner_finished() {
+//        std::lock_guard<std::mutex> l(_lock);
+//        _scanner_finished = true;
+//        _cv.notify_one();
+//    }
+//
+//private:
+//    void _fetch_loop() {
+//        while (true) {
+//            {
+//                std::unique_lock<std::mutex> l(_lock);
+//                _cv.wait(l, [this]() {
+//                    return _stop || _scanner_finished || _scanners.empty();
+//                });
+//                if (_stop) {
+//                    break;
+//                }
+//                _scanner_finished = false;
+//            }
+//
+//            // 获取新的分片并创建 scanner
+//            bool has_next = false;
+//            TFileScanRange range;
+//            auto status = _split_source->get_next(&has_next, &range);
+//            if (!status.ok()) {
+//                LOG(WARNING) << "Failed to get next split: " << status;
+//                continue;
+//            }
+//
+//            if (!has_next) {
+//                stop();
+//                break;
+//            }
+//
+//            // 创建新的 scanner
+//            auto scanner = std::make_shared<VFileScanner>(_state, _scan_node);
+//            status = scanner->init(_state, _scan_node, {range}, nullptr, nullptr);
+//            if (!status.ok()) {
+//                LOG(WARNING) << "Failed to init scanner: " << status;
+//                continue;
+//            }
+//
+//            {
+//                std::lock_guard<std::mutex> l(_lock);
+//                _scanners.push(scanner);
+//                _cv.notify_one();
+//            }
+//        }
+//    }
+//
+//private:
+//    std::shared_ptr<SplitSourceConnector> _split_source;
+//    RuntimeState* _state;
+//    TFileScanNode _scan_node;
+//    std::atomic<bool> _running;
+//    std::atomic<bool> _stop;
+//    std::atomic<bool> _scanner_finished{true}; // 初始为 true 以获取第一批分片
+//    std::mutex _lock;
+//    std::condition_variable _cv;
+//    std::thread _fetch_thread;
+//    std::queue<std::shared_ptr<VFileScanner>> _scanners;
+//};
+//
+//} // namespace doris::vectorized
diff --git a/be/src/vec/exec/executor/listenable_future.h b/be/src/vec/exec/executor/listenable_future.h
new file mode 100644
index 0000000..420dc31
--- /dev/null
+++ b/be/src/vec/exec/executor/listenable_future.h
@@ -0,0 +1,250 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <condition_variable>
+#include <exception>
+#include <functional>
+#include <iostream>
+#include <memory>
+#include <mutex>
+#include <vector>
+
+#include "common/status.h"
+#include "glog/logging.h"
+
+namespace doris {
+namespace vectorized {
+
+struct Void {
+    bool operator==(const Void&) const { return true; }
+    bool operator!=(const Void&) const { return false; }
+};
+
+template <typename T>
+class SharedListenableFuture;
+
+template <typename T>
+class ListenableFuture {
+public:
+    using Callback = std::function<void(const T&, const doris::Status&)>;
+
+    ListenableFuture(const ListenableFuture&) = delete;
+    ListenableFuture& operator=(const ListenableFuture&) = delete;
+
+    ListenableFuture(ListenableFuture&& other) noexcept
+            : _ready(other._ready),
+              _value(std::move(other._value)),
+              _status(std::move(other._status)),
+              _callbacks(std::move(other._callbacks)) {
+        other._ready = false;
+    }
+
+    ListenableFuture& operator=(ListenableFuture&& other) noexcept {
+        if (this != &other) {
+            std::lock_guard<std::mutex> lock(mutex_);
+            std::lock_guard<std::mutex> other_lock(other.mutex_);
+            _ready = other._ready;
+            _value = std::move(other._value);
+            _status = std::move(other._status);
+            _callbacks = std::move(other._callbacks);
+            other._ready = false;
+        }
+        return *this;
+    }
+
+    ListenableFuture() : _ready(false) {}
+
+    void set_value(const T& value) { _execute(value, doris::Status::OK()); }
+
+    void set_error(const doris::Status& status) { _execute({}, status); }
+
+    void add_callback(Callback cb) {
+        std::lock_guard<std::mutex> lock(mutex_);
+        if (_ready) {
+            cb(_value, _status);
+        } else {
+            _callbacks.emplace_back(std::move(cb));
+        }
+    }
+
+    static ListenableFuture<T> create_ready(T value) {
+        ListenableFuture<T> future;
+        future.set_value(std::move(value));
+        return future;
+    }
+
+    template <typename U = T>
+    static typename std::enable_if<std::is_same<U, Void>::value, ListenableFuture<U>>::type
+    create_ready() {
+        ListenableFuture<U> future;
+        future.set_value(Void {});
+        return future;
+    }
+
+    bool is_ready() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return _ready;
+    }
+
+    bool is_done() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return _ready && _status.ok();
+    }
+
+    bool is_error() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return _ready && !_status.ok();
+    }
+
+    const doris::Status& get_status() const {
+        std::lock_guard<std::mutex> lock(mutex_);
+        return _status;
+    }
+
+    Result<T> get() {
+        std::unique_lock<std::mutex> lock(mutex_);
+        while (!_ready) {
+            cv_.wait(lock);
+        }
+        if (!_status.ok()) {
+            return unexpected(_status);
+        }
+        return _value;
+    }
+
+    SharedListenableFuture<T> share() && { return SharedListenableFuture<T>(std::move(*this)); }
+
+    friend class SharedListenableFuture<T>;
+
+private:
+    void _execute(const T& value, const doris::Status& status) {
+        std::vector<Callback> tmp_callbacks;
+        {
+            std::lock_guard<std::mutex> lock(mutex_);
+            if (_ready) {
+                return;
+            }
+
+            _value = value;
+            _status = status;
+            _ready = true;
+            tmp_callbacks.swap(_callbacks);
+        }
+
+        for (auto& cb : tmp_callbacks) {
+            cb(_value, _status);
+        }
+        cv_.notify_all();
+    }
+
+    mutable std::mutex mutex_;
+    std::condition_variable cv_;
+    bool _ready;
+    T _value;
+    doris::Status _status = doris::Status::OK();
+    std::vector<Callback> _callbacks;
+};
+
+template <typename T>
+class SharedListenableFuture {
+public:
+    using Callback = typename ListenableFuture<T>::Callback;
+
+    SharedListenableFuture(const SharedListenableFuture&) = default;
+    SharedListenableFuture& operator=(const SharedListenableFuture&) = default;
+
+    SharedListenableFuture(SharedListenableFuture&&) = default;
+    SharedListenableFuture& operator=(SharedListenableFuture&&) = default;
+
+    explicit SharedListenableFuture(ListenableFuture<T>&& future)
+            : impl_(std::make_shared<ListenableFuture<T>>(std::move(future))) {}
+
+    explicit SharedListenableFuture(std::shared_ptr<ListenableFuture<T>> future_ptr)
+            : impl_(std::move(future_ptr)) {}
+
+    void add_callback(Callback cb) { return impl_->add_callback(std::move(cb)); }
+
+    bool is_ready() const { return impl_->is_ready(); }
+
+    bool is_done() const { return impl_->is_done(); }
+
+    bool is_error() const { return impl_->is_error(); }
+
+    const doris::Status& get_status() const { return impl_->get_status(); }
+
+    Result<T> get() { return impl_->get(); }
+
+    static SharedListenableFuture<T> create_ready(T value) {
+        return SharedListenableFuture<T>(ListenableFuture<T>::create_ready(std::move(value)));
+    }
+
+    SharedListenableFuture() : impl_(std::make_shared<ListenableFuture<T>>()) {}
+
+    void set_value(const T& value) { impl_->set_value(value); }
+
+    void set_error(const doris::Status& status) { impl_->set_error(status); }
+
+    template <typename U = T>
+    static typename std::enable_if<std::is_same<U, Void>::value, SharedListenableFuture<U>>::type
+    create_ready() {
+        return SharedListenableFuture<U>(ListenableFuture<U>::create_ready());
+    }
+
+private:
+    std::shared_ptr<ListenableFuture<T>> impl_;
+};
+
+namespace listenable_future {
+inline SharedListenableFuture<Void> null_future =
+        SharedListenableFuture<Void>::create_ready(Void {});
+} // namespace listenable_future
+
+} // namespace vectorized
+} // namespace doris
+
+// 使用示例
+//int main() {
+//    ListenableFuture<int> future;
+//
+//    // 添加回调
+//    future.add_callback(
+//            [](int value) {
+//                printf("Got value: %d\n", value);
+//            },
+//            [](auto e) {
+//                try {
+//                    if (e) std::rethrow_exception(e);
+//                } catch (const std::exception& ex) {
+//                    printf("Exception: %s\n", ex.what());
+//                }
+//            }
+//    );
+//
+//    // 链式调用
+//    auto next = future.then([](int x) {
+//                          return x * 2;
+//                      }).then([](int x) {
+//        return std::to_string(x);
+//    });
+//
+//    // 设置结果
+//    future.set_value(42);
+//
+//    return 0;
+//}
\ No newline at end of file
diff --git a/be/src/vec/exec/executor/split_runner.h b/be/src/vec/exec/executor/split_runner.h
new file mode 100644
index 0000000..49d8dd1
--- /dev/null
+++ b/be/src/vec/exec/executor/split_runner.h
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <memory>
+#include <string>
+
+#include "common/status.h"
+#include "vec/exec/executor/listenable_future.h"
+
+namespace doris {
+namespace vectorized {
+
+class SplitRunner {
+public:
+    virtual ~SplitRunner() = default;
+    virtual Status init() = 0;
+    virtual Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds duration) = 0;
+    virtual void close(const Status& status) = 0;
+    virtual bool is_finished() = 0;
+    virtual Status finished_status() = 0;
+    virtual std::string get_info() const = 0;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/stats.h b/be/src/vec/exec/executor/stats.h
new file mode 100644
index 0000000..c86961b
--- /dev/null
+++ b/be/src/vec/exec/executor/stats.h
@@ -0,0 +1,104 @@
+//// Licensed to the Apache Software Foundation (ASF) under one
+//// or more contributor license agreements.  See the NOTICE file
+//// distributed with this work for additional information
+//// regarding copyright ownership.  The ASF licenses this file
+//// to you under the Apache License, Version 2.0 (the
+//// "License"); you may not use this file except in compliance
+//// with the License.  You may obtain a copy of the License at
+////
+////   http://www.apache.org/licenses/LICENSE-2.0
+////
+//// Unless required by applicable law or agreed to in writing,
+//// software distributed under the License is distributed on an
+//// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//// KIND, either express or implied.  See the License for the
+//// specific language governing permissions and limitations
+//// under the License.
+//
+//#pragma once
+//#include <array>
+//#include <atomic>
+//#include <chrono>
+//#include <cmath>
+//#include <vector>
+//
+//namespace doris {
+//namespace vectorized {
+//
+//class TimeStats {
+//public:
+//    void add(std::chrono::nanoseconds duration) {
+//        totalTime.fetch_add(duration.count());
+//        count.fetch_add(1);
+//    }
+//
+//    double getAverage() const {
+//        auto total = totalTime.load();
+//        auto cnt = count.load();
+//        return cnt > 0 ? static_cast<double>(total) / cnt : 0.0;
+//    }
+//
+//private:
+//    std::atomic<int64_t> totalTime {0};
+//    std::atomic<int64_t> count {0};
+//};
+//
+//class TimeDistribution {
+//public:
+//    void add(std::chrono::microseconds duration) {
+//        auto bucket = getBucket(duration.count());
+//        buckets[bucket].fetch_add(1);
+//        count.fetch_add(1);
+//    }
+//
+//private:
+//    static constexpr size_t BUCKET_COUNT = 32;
+//    std::array<std::atomic<int64_t>, BUCKET_COUNT> buckets {};
+//    std::atomic<int64_t> count {0};
+//
+//    static size_t getBucket(int64_t value) {
+//        return std::min(static_cast<size_t>(std::log2(static_cast<double>(value) + 1)),
+//                        BUCKET_COUNT - 1);
+//    }
+//};
+//
+//class CounterStats {
+//public:
+//    void add(int64_t value) { total.fetch_add(value); }
+//
+//    int64_t getTotal() const { return total.load(); }
+//
+//private:
+//    std::atomic<int64_t> total {0};
+//};
+//
+//class DistributionStats {
+//public:
+//    void add(double value) {
+//        count.fetch_add(1);
+//        sum.fetch_add(value);
+//        updateMin(value);
+//        updateMax(value);
+//    }
+//
+//private:
+//    std::atomic<int64_t> count {0};
+//    std::atomic<double> sum {0.0};
+//    std::atomic<double> min {std::numeric_limits<double>::max()};
+//    std::atomic<double> max {std::numeric_limits<double>::lowest()};
+//
+//    void updateMin(double value) {
+//        double current = min.load();
+//        while (value < current && !min.compare_exchange_weak(current, value)) {
+//        }
+//    }
+//
+//    void updateMax(double value) {
+//        double current = max.load();
+//        while (value > current && !max.compare_exchange_weak(current, value)) {
+//        }
+//    }
+//};
+//
+//} // namespace vectorized
+//} // namespace doris
diff --git a/be/src/vec/exec/executor/task_executor.h b/be/src/vec/exec/executor/task_executor.h
new file mode 100644
index 0000000..e6bf89c
--- /dev/null
+++ b/be/src/vec/exec/executor/task_executor.h
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <future>
+#include <memory>
+#include <optional>
+#include <vector>
+
+#include "vec/exec/executor/listenable_future.h"
+#include "vec/exec/executor/split_runner.h"
+#include "vec/exec/executor/task_id.h"
+
+namespace doris {
+namespace vectorized {
+
+class TaskHandle;
+
+class TaskExecutor {
+public:
+    virtual ~TaskExecutor() = default;
+
+    virtual Status init() = 0;
+    virtual Status start() = 0;
+    virtual void stop() = 0;
+
+    virtual Result<std::shared_ptr<TaskHandle>> create_task(
+            const TaskId& task_id, std::function<double()> utilization_supplier,
+            int initial_split_concurrency,
+            std::chrono::nanoseconds split_concurrency_adjust_frequency,
+            std::optional<int> max_drivers_per_task) = 0;
+
+    virtual Status add_task(const TaskId& task_id, std::shared_ptr<TaskHandle> task_handle) = 0;
+
+    virtual Status remove_task(std::shared_ptr<TaskHandle> task_handle) = 0;
+
+    virtual Result<std::vector<SharedListenableFuture<Void>>> enqueue_splits(
+            std::shared_ptr<TaskHandle> task_handle, bool intermediate,
+            const std::vector<std::shared_ptr<SplitRunner>>& splits) = 0;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/task_handle.h b/be/src/vec/exec/executor/task_handle.h
new file mode 100644
index 0000000..0193ab4
--- /dev/null
+++ b/be/src/vec/exec/executor/task_handle.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/status.h"
+#include "vec/exec/executor/task_id.h"
+
+namespace doris {
+namespace vectorized {
+
+class TaskHandle {
+public:
+    virtual ~TaskHandle() = default;
+    virtual Status init() = 0;
+    virtual bool is_closed() const = 0;
+    virtual TaskId task_id() const = 0;
+};
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/executor/task_id.h b/be/src/vec/exec/executor/task_id.h
new file mode 100644
index 0000000..347bafe
--- /dev/null
+++ b/be/src/vec/exec/executor/task_id.h
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <memory>
+#include <string>
+
+namespace doris {
+namespace vectorized {
+
+class TaskId {
+public:
+    TaskId(const std::string& id) : _id(id) {}
+    std::string to_string() const { return _id; }
+    bool operator==(const TaskId& other) const { return _id == other._id; }
+
+private:
+    std::string _id;
+};
+
+} // namespace vectorized
+} // namespace doris
+
+namespace std {
+template <>
+struct hash<doris::vectorized::TaskId> {
+    size_t operator()(const doris::vectorized::TaskId& task_id) const {
+        return std::hash<std::string> {}(task_id.to_string());
+    }
+};
+} // namespace std
diff --git a/be/src/vec/exec/executor/ticker.h b/be/src/vec/exec/executor/ticker.h
new file mode 100644
index 0000000..5b36e26
--- /dev/null
+++ b/be/src/vec/exec/executor/ticker.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <chrono>
+#include <mutex>
+
+namespace doris {
+namespace vectorized {
+
+class Ticker {
+public:
+    virtual ~Ticker() = default;
+
+    /**
+     * Returns the number of nanoseconds since a fixed reference point
+     */
+    virtual int64_t read() const = 0;
+
+protected:
+    Ticker() = default;
+};
+
+class SystemTicker : public Ticker {
+public:
+    int64_t read() const override {
+        return std::chrono::duration_cast<std::chrono::nanoseconds>(
+                       std::chrono::steady_clock::now().time_since_epoch())
+                .count();
+    }
+};
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.cpp b/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.cpp
new file mode 100644
index 0000000..9434b6c
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.cpp
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/executor/time_sharing/multilevel_split_queue.h"
+
+#include <algorithm>
+#include <cmath>
+#include <numeric>
+#include <unordered_set>
+
+namespace doris {
+namespace vectorized {
+
+MultilevelSplitQueue::MultilevelSplitQueue(double level_time_multiplier)
+        : _level_time_multiplier(level_time_multiplier), _level_waiting_splits() {
+    for (size_t i = 0; i < LEVEL_THRESHOLD_SECONDS.size(); ++i) {
+        _level_scheduled_time[i].store(0);
+        _level_min_priority[i].store(-1);
+    }
+}
+
+int MultilevelSplitQueue::compute_level(int64_t scheduled_nanos) {
+    double scheduled_seconds = static_cast<double>(scheduled_nanos) / 1000000000.0;
+
+    for (int level = 0; level < (LEVEL_THRESHOLD_SECONDS.size() - 1); level++) {
+        if (scheduled_seconds < LEVEL_THRESHOLD_SECONDS[level + 1]) {
+            return level;
+        }
+    }
+
+    return LEVEL_THRESHOLD_SECONDS.size() - 1;
+}
+
+/**
+ * 'Charges' the quanta run time to the task <i>and</i> the level it belongs to in
+ * an effort to maintain the target thread utilization ratios between levels and to
+ * maintain fairness within a level.
+ * <p>
+ * Consider an example split where a read hung for several minutes. This is either a bug
+ * or a failing dependency. In either case we do not want to charge the task too much,
+ * and we especially do not want to charge the level too much - i.e. cause other queries
+ * in this level to starve.
+ *
+ * @return the new priority for the task
+ */
+Priority MultilevelSplitQueue::update_priority(const Priority& old_priority, int64_t quanta_nanos,
+                                               int64_t scheduled_nanos) {
+    int old_level = old_priority.level();
+    int new_level = compute_level(scheduled_nanos);
+
+    int64_t level_contribution = std::min(quanta_nanos, LEVEL_CONTRIBUTION_CAP);
+
+    if (old_level == new_level) {
+        _level_scheduled_time[old_level].fetch_add(level_contribution);
+        return Priority(old_level, old_priority.level_priority() + quanta_nanos);
+    }
+
+    int64_t remaining_level_contribution = level_contribution;
+    int64_t remaining_task_time = quanta_nanos;
+
+    for (int current_level = old_level; current_level < new_level; ++current_level) {
+        int64_t level_time_threshold =
+                static_cast<int64_t>((LEVEL_THRESHOLD_SECONDS[current_level + 1] -
+                                      LEVEL_THRESHOLD_SECONDS[current_level]) *
+                                     1e9);
+
+        int64_t time_accrued_to_level =
+                std::min(level_time_threshold, remaining_level_contribution);
+        _level_scheduled_time[current_level].fetch_add(time_accrued_to_level);
+        remaining_level_contribution -= time_accrued_to_level;
+        remaining_task_time -= time_accrued_to_level;
+    }
+
+    _level_scheduled_time[new_level].fetch_add(remaining_level_contribution);
+    int64_t new_level_min_priority = get_level_min_priority(new_level, scheduled_nanos);
+    return Priority(new_level, new_level_min_priority + remaining_task_time);
+}
+
+int64_t MultilevelSplitQueue::get_level_min_priority(int level, int64_t task_thread_usage_nanos) {
+    int64_t expected = -1;
+    _level_min_priority[level].compare_exchange_strong(expected, task_thread_usage_nanos);
+    return _level_min_priority[level].load();
+}
+
+void MultilevelSplitQueue::offer(std::shared_ptr<PrioritizedSplitRunner> split) {
+    split->set_ready();
+    int level = split->priority().level();
+    std::unique_lock<std::mutex> lock(_mutex);
+    _offer_locked(split, level, lock);
+}
+
+void MultilevelSplitQueue::_offer_locked(std::shared_ptr<PrioritizedSplitRunner> split, int level,
+                                         std::unique_lock<std::mutex>& lock) {
+    if (_level_waiting_splits[level].empty()) {
+        // Accesses to _level_scheduled_time are not synchronized, so we have a data race
+        // here - our level time math will be off. However, the staleness is bounded by
+        // the fact that only running splits that complete during this computation
+        // can update the level time. Therefore, this is benign.
+        int64_t level0_time = _get_level0_target_time(lock);
+        int64_t level_expected_time =
+                static_cast<int64_t>(level0_time / std::pow(_level_time_multiplier, level));
+        int64_t delta = level_expected_time - _level_scheduled_time[level].load();
+        _level_scheduled_time[level].fetch_add(delta);
+    }
+
+    _level_waiting_splits[level].push(split);
+    _not_empty.notify_all();
+}
+
+std::shared_ptr<PrioritizedSplitRunner> MultilevelSplitQueue::take() {
+    std::unique_lock<std::mutex> lock(_mutex);
+
+    while (!_interrupted) {
+        auto split = _poll_split(lock);
+        if (split) {
+            if (split->update_level_priority()) {
+                _offer_locked(split, split->priority().level(), lock);
+                continue;
+            }
+            int selected_level = split->priority().level();
+            _level_min_priority[selected_level].store(split->priority().level_priority());
+            return split;
+        }
+
+        _not_empty.wait(lock);
+    }
+    return nullptr;
+}
+
+/**
+ * Attempts to give each level a target amount of scheduled time, which is configurable
+ * using levelTimeMultiplier.
+ * <p>
+ * This function selects the level that has the lowest ratio of actual to the target time
+ * with the objective of minimizing deviation from the target scheduled time. From this level,
+ * we pick the split with the lowest priority.
+ */
+std::shared_ptr<PrioritizedSplitRunner> MultilevelSplitQueue::_poll_split(
+        std::unique_lock<std::mutex>& lock) {
+    int64_t target_scheduled_time = _get_level0_target_time(lock);
+    double worst_ratio = 1.0;
+    int selected_level = -1;
+
+    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.size(); ++level) {
+        if (!_level_waiting_splits[level].empty()) {
+            int64_t level_time = _level_scheduled_time[level].load();
+            double ratio = (level_time == 0) ? 0
+                                             : static_cast<double>(target_scheduled_time) /
+                                                       (1.0 * level_time);
+
+            if (selected_level == -1 || ratio > worst_ratio) {
+                worst_ratio = ratio;
+                selected_level = level;
+            }
+        }
+        target_scheduled_time =
+                static_cast<int64_t>(std::round(target_scheduled_time / _level_time_multiplier));
+    }
+
+    if (selected_level == -1) return nullptr;
+
+    auto result = _level_waiting_splits[selected_level].top();
+    _level_waiting_splits[selected_level].pop();
+    return result;
+}
+
+void MultilevelSplitQueue::remove(std::shared_ptr<PrioritizedSplitRunner> split) {
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    for (auto& level_queue : _level_waiting_splits) {
+        std::priority_queue<std::shared_ptr<PrioritizedSplitRunner>,
+                            std::vector<std::shared_ptr<PrioritizedSplitRunner>>,
+                            SplitRunnerComparator>
+                new_queue;
+
+        while (!level_queue.empty()) {
+            auto current = level_queue.top();
+            level_queue.pop();
+            if (current != split) {
+                new_queue.emplace(std::move(current));
+            }
+        }
+        level_queue.swap(new_queue);
+    }
+
+    if (std::all_of(_level_waiting_splits.begin(), _level_waiting_splits.end(),
+                    [](const auto& q) { return q.empty(); })) {
+        _not_empty.notify_all();
+    }
+}
+
+void MultilevelSplitQueue::remove_all(
+        const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits) {
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>> to_remove(splits.begin(),
+                                                                          splits.end());
+
+    for (auto& level_queue : _level_waiting_splits) {
+        std::priority_queue<std::shared_ptr<PrioritizedSplitRunner>,
+                            std::vector<std::shared_ptr<PrioritizedSplitRunner>>,
+                            SplitRunnerComparator>
+                new_queue;
+
+        while (!level_queue.empty()) {
+            auto current = level_queue.top();
+            level_queue.pop();
+            if (!to_remove.count(current)) {
+                new_queue.emplace(std::move(current));
+            }
+        }
+        level_queue.swap(new_queue);
+    }
+
+    if (std::all_of(_level_waiting_splits.begin(), _level_waiting_splits.end(),
+                    [](const auto& q) { return q.empty(); })) {
+        _not_empty.notify_all();
+    }
+}
+
+size_t MultilevelSplitQueue::size() const {
+    std::lock_guard<std::mutex> lock(_mutex);
+    return std::accumulate(_level_waiting_splits.begin(), _level_waiting_splits.end(), size_t(0),
+                           [](size_t sum, const auto& queue) { return sum + queue.size(); });
+}
+
+void MultilevelSplitQueue::interrupt() {
+    std::lock_guard<std::mutex> lock(_mutex);
+    _interrupted = true;
+    _not_empty.notify_all();
+}
+
+int64_t MultilevelSplitQueue::_get_level0_target_time(std::unique_lock<std::mutex>& lock) {
+    int64_t level0_target_time = _level_scheduled_time[0].load();
+    double current_multiplier = _level_time_multiplier;
+
+    for (int level = 0; level < LEVEL_THRESHOLD_SECONDS.size(); ++level) {
+        current_multiplier /= _level_time_multiplier;
+        int64_t level_time = _level_scheduled_time[level].load();
+        level0_target_time =
+                std::max(level0_target_time, static_cast<int64_t>(level_time / current_multiplier));
+    }
+    return level0_target_time;
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.h b/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.h
new file mode 100644
index 0000000..50eb6a3
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/multilevel_split_queue.h
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <array>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <queue>
+
+#include "common/factory_creator.h"
+#include "vec/exec/executor/time_sharing/prioritized_split_runner.h"
+#include "vec/exec/executor/time_sharing/priority.h"
+
+namespace doris {
+namespace vectorized {
+
+struct SplitRunnerComparator {
+    bool operator()(const std::shared_ptr<PrioritizedSplitRunner>& a,
+                    const std::shared_ptr<PrioritizedSplitRunner>& b) const {
+        const auto a_priority = a->priority().level_priority();
+        const auto b_priority = b->priority().level_priority();
+        if (a_priority != b_priority) {
+            return a_priority > b_priority;
+        }
+
+        return a->worker_id() > b->worker_id();
+    }
+};
+
+class MultilevelSplitQueue {
+    ENABLE_FACTORY_CREATOR(MultilevelSplitQueue);
+
+public:
+    static constexpr std::array<int, 5> LEVEL_THRESHOLD_SECONDS = {0, 1, 10, 60, 300};
+    static constexpr int64_t LEVEL_CONTRIBUTION_CAP = 30 * 1000000000LL; // 30 seconds in nanos
+
+    explicit MultilevelSplitQueue(double level_time_multiplier);
+
+    static int compute_level(int64_t scheduled_nanos);
+
+    Priority update_priority(const Priority& old_priority, int64_t quanta_nanos,
+                             int64_t scheduled_nanos);
+    int64_t get_level_min_priority(int level, int64_t scheduled_nanos);
+    void offer(std::shared_ptr<PrioritizedSplitRunner> split);
+    std::shared_ptr<PrioritizedSplitRunner> take();
+    size_t size() const;
+    void remove(std::shared_ptr<PrioritizedSplitRunner> split);
+    void remove_all(const std::vector<std::shared_ptr<PrioritizedSplitRunner>>& splits);
+    void interrupt();
+
+    int64_t level_scheduled_time(int level) const { return _level_scheduled_time[level].load(); }
+
+private:
+    int64_t _get_level0_target_time(std::unique_lock<std::mutex>& lock);
+    std::shared_ptr<PrioritizedSplitRunner> _poll_split(std::unique_lock<std::mutex>& lock);
+    void _offer_locked(std::shared_ptr<PrioritizedSplitRunner> split, int level,
+                       std::unique_lock<std::mutex>& lock);
+
+    const double _level_time_multiplier;
+
+    std::array<std::priority_queue<std::shared_ptr<PrioritizedSplitRunner>,
+                                   std::vector<std::shared_ptr<PrioritizedSplitRunner>>,
+                                   SplitRunnerComparator>,
+               LEVEL_THRESHOLD_SECONDS.size()>
+            _level_waiting_splits;
+
+    std::array<std::atomic<int64_t>, LEVEL_THRESHOLD_SECONDS.size()> _level_scheduled_time;
+    std::array<std::atomic<int64_t>, LEVEL_THRESHOLD_SECONDS.size()> _level_min_priority;
+
+    std::atomic<bool> _interrupted {false};
+    mutable std::mutex _mutex;
+    std::condition_variable _not_empty;
+
+    //    std::array<std::shared_ptr<CounterStats>, LEVEL_THRESHOLD_SECONDS.size()>
+    //            _selected_level_counters;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.cpp b/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.cpp
new file mode 100644
index 0000000..a9e97ec
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.cpp
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/executor/time_sharing/prioritized_split_runner.h"
+
+#include <fmt/format.h>
+
+#include <chrono>
+#include <functional>
+#include <thread>
+
+#include "vec/exec/executor/time_sharing/time_sharing_task_handle.h"
+
+namespace doris {
+namespace vectorized {
+
+std::atomic<int64_t> PrioritizedSplitRunner::_next_worker_id(0);
+
+PrioritizedSplitRunner::PrioritizedSplitRunner(std::shared_ptr<TimeSharingTaskHandle> task_handle,
+                                               int split_id,
+                                               std::shared_ptr<SplitRunner> split_runner,
+                                               std::shared_ptr<Ticker> ticker)
+        : _task_handle(std::move(task_handle)),
+          _split_id(split_id),
+          _worker_id(_next_worker_id.fetch_add(1, std::memory_order_relaxed)),
+          _split_runner(std::move(split_runner)),
+          _ticker(ticker) {
+    update_level_priority();
+}
+
+//PrioritizedSplitRunner::PrioritizedSplitRunner(CounterStats& globalCpuTimeMicros,
+//                                               CounterStats& globalScheduledTimeMicros,
+//                                               TimeStats& blockedQuantaWallTime,
+//                                               TimeStats& unblockedQuantaWallTime)
+//        : _created_nanos(std::chrono::steady_clock::now().time_since_epoch().count()),
+//          _split_id(0),
+//          _worker_id(_next_worker_id.fetch_add(1, std::memory_order_relaxed)),
+//          _ticker(std::make_shared<SystemTicker>()),
+//          _global_cpu_time_micros(globalCpuTimeMicros),
+//          _global_scheduled_time_micros(globalScheduledTimeMicros),
+//          _blocked_quanta_wall_time(blockedQuantaWallTime),
+//          _unblocked_quanta_wall_time(unblockedQuantaWallTime) {
+//}
+
+Status PrioritizedSplitRunner::init() {
+    return _split_runner->init();
+}
+
+std::shared_ptr<TimeSharingTaskHandle> PrioritizedSplitRunner::task_handle() const {
+    return _task_handle;
+}
+
+SharedListenableFuture<Void> PrioritizedSplitRunner::finished_future() {
+    return _finished_future;
+}
+
+bool PrioritizedSplitRunner::is_closed() const {
+    return _closed.load();
+}
+
+void PrioritizedSplitRunner::close(const Status& status) {
+    if (!_closed.exchange(true)) {
+        _split_runner->close(status);
+    }
+}
+
+int64_t PrioritizedSplitRunner::created_nanos() const {
+    return _created_nanos;
+}
+
+bool PrioritizedSplitRunner::is_finished() {
+    bool finished = _split_runner->is_finished();
+    if (finished) {
+        _finished_future.set_value({});
+    }
+    return finished || _closed.load() || _task_handle->is_closed();
+}
+
+Status PrioritizedSplitRunner::finished_status() {
+    return _split_runner->finished_status();
+}
+
+int64_t PrioritizedSplitRunner::scheduled_nanos() const {
+    return _scheduled_nanos.load();
+}
+
+Result<SharedListenableFuture<Void>> PrioritizedSplitRunner::process() {
+    if (is_closed()) {
+        SharedListenableFuture<Void> future;
+        future.set_value({});
+        return future;
+    }
+
+    auto start_nanos = std::chrono::steady_clock::now().time_since_epoch().count();
+    int64_t expected = 0;
+    _start.compare_exchange_strong(expected, start_nanos);
+    _last_ready.compare_exchange_strong(expected, start_nanos);
+    _process_calls.fetch_add(1);
+
+    _wait_nanos.fetch_add(start_nanos - _last_ready.load());
+
+    auto process_start_time = _ticker->read();
+    auto blocked = _split_runner->process_for(SPLIT_RUN_QUANTA);
+    if (!blocked.has_value()) {
+        return unexpected(std::move(blocked).error());
+    }
+    auto process_end_time = _ticker->read();
+    auto quanta_scheduled_nanos = process_end_time - process_start_time;
+
+    _scheduled_nanos.fetch_add(quanta_scheduled_nanos);
+
+    // _priority.store(_task_handle->add_scheduled_nanos(quanta_scheduled_nanos));
+
+    {
+        std::lock_guard<std::mutex> lock(_priority_mutex);
+        _priority = _task_handle->add_scheduled_nanos(quanta_scheduled_nanos);
+    }
+
+    return blocked;
+}
+
+void PrioritizedSplitRunner::set_ready() {
+    _last_ready.store(_ticker->read());
+}
+
+/**
+ * Updates the (potentially stale) priority value cached in this object.
+ * This should be called when this object is outside the queue.
+ *
+ * @return true if the level changed.
+ */
+//bool PrioritizedSplitRunner::update_level_priority() {
+//    Priority new_priority = _task_handle->priority();
+//    Priority old_priority = _priority.exchange(new_priority);
+//    return new_priority.level() != old_priority.level();
+//}
+
+bool PrioritizedSplitRunner::update_level_priority() {
+    std::lock_guard<std::mutex> lock(_priority_mutex);
+    Priority new_priority = _task_handle->priority();
+    Priority old_priority = _priority;
+    _priority = new_priority;
+    return new_priority.level() != old_priority.level();
+}
+
+//
+//void PrioritizedSplitRunner::reset_level_priority() {
+//     _priority.store(_task_handle->reset_level_priority());
+//}
+
+void PrioritizedSplitRunner::reset_level_priority() {
+    std::lock_guard<std::mutex> lock(_priority_mutex);
+    _priority = _task_handle->reset_level_priority();
+}
+
+int64_t PrioritizedSplitRunner::worker_id() const {
+    return _worker_id;
+}
+
+int PrioritizedSplitRunner::split_id() const {
+    return _split_id;
+}
+
+//Priority PrioritizedSplitRunner::priority() const {
+//    return _priority.load();
+//}
+
+Priority PrioritizedSplitRunner::priority() const {
+    std::lock_guard<std::mutex> lock(_priority_mutex);
+    return _priority;
+}
+
+std::string PrioritizedSplitRunner::get_info() const {
+    // const auto now = std::chrono::steady_clock::now();
+    // const int64_t current_time = now.time_since_epoch().count();
+    // const int64_t wall_time = (current_time - _start.load()) / 1'000'000; // 转换为毫秒
+
+    // return fmt::format(
+    //     "Split {:<15}-{} {} (start = {:.1f}, wall = {} ms, cpu = {} ms, wait = {} ms, calls = {})",
+    //     _task_handle->get_task_id(),       // 假设返回字符串类型
+    //     _split_id,                         // 整数类型
+    //     _split_runner->get_info(),         // 需要确保SplitRunner有get_info()
+    //     _start.load() / 1'000'000.0,       // 转换为毫秒并保留1位小数
+    //     wall_time,                         // 已计算的耗时
+    //     _global_cpu_time_micros.get() / 1000, // 微秒转毫秒
+    //     _wait_nanos.load() / 1'000'000,    // 纳秒转毫秒
+    //     _process_calls.load()              // 调用次数
+    // );
+    return "";
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.h b/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.h
new file mode 100644
index 0000000..b7e396b
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/prioritized_split_runner.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <memory>
+
+#include "common/factory_creator.h"
+#include "vec/exec/executor/listenable_future.h"
+#include "vec/exec/executor/split_runner.h"
+#include "vec/exec/executor/stats.h"
+#include "vec/exec/executor/ticker.h"
+#include "vec/exec/executor/time_sharing/priority.h"
+
+namespace doris {
+namespace vectorized {
+
+class TimeSharingTaskHandle;
+
+class PrioritizedSplitRunner : public std::enable_shared_from_this<PrioritizedSplitRunner> {
+    ENABLE_FACTORY_CREATOR(PrioritizedSplitRunner);
+
+public:
+    static constexpr auto SPLIT_RUN_QUANTA = std::chrono::seconds(1);
+
+    PrioritizedSplitRunner(std::shared_ptr<TimeSharingTaskHandle> task_handle, int split_id,
+                           std::shared_ptr<SplitRunner> split_runner,
+                           std::shared_ptr<Ticker> ticker);
+
+    //    PrioritizedSplitRunner(CounterStats& globalCpuTimeMicros,
+    //                           CounterStats& globalScheduledTimeMicros,
+    //                           TimeStats& blockedQuantaWallTime, TimeStats& unblockedQuantaWallTime);
+
+    Status init();
+
+    virtual ~PrioritizedSplitRunner() = default;
+
+    std::shared_ptr<TimeSharingTaskHandle> task_handle() const;
+    SharedListenableFuture<Void> finished_future();
+    bool is_closed() const;
+    void close(const Status& status);
+    int64_t created_nanos() const;
+    bool is_finished();
+    Status finished_status();
+    int64_t scheduled_nanos() const;
+    Result<SharedListenableFuture<Void>> process();
+    void set_ready();
+    bool update_level_priority();
+    void reset_level_priority();
+    int64_t worker_id() const;
+    int split_id() const;
+    virtual Priority priority() const;
+
+    std::string get_info() const;
+
+private:
+    static std::atomic<int64_t> _next_worker_id;
+
+    const int64_t _created_nanos {std::chrono::steady_clock::now().time_since_epoch().count()};
+    std::shared_ptr<TimeSharingTaskHandle> _task_handle;
+    const int _split_id;
+    const int64_t _worker_id;
+    std::shared_ptr<SplitRunner> _split_runner;
+    std::shared_ptr<Ticker> _ticker;
+    SharedListenableFuture<Void> _finished_future {};
+
+    std::atomic<bool> _closed {false};
+    //    std::atomic<Priority> _priority {Priority {0, 0}};
+    Priority _priority {0, 0};
+    mutable std::mutex _priority_mutex;
+    std::atomic<int64_t> _last_ready {0};
+    std::atomic<int64_t> _start {0};
+    std::atomic<int64_t> _scheduled_nanos {0};
+    std::atomic<int64_t> _wait_nanos {0};
+    std::atomic<int> _process_calls {0};
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/priority.h b/be/src/vec/exec/executor/time_sharing/priority.h
new file mode 100644
index 0000000..f26e879
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/priority.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <atomic>
+#include <sstream>
+#include <string>
+
+namespace doris {
+namespace vectorized {
+
+/**
+ * Task (and split) priority is composed of a level and a within-level
+ * priority. Level decides which queue the split is placed in, while
+ * within-level priority decides which split is executed next in that level.
+ * <p>
+ * Tasks move from a lower to higher level as they exceed level thresholds
+ * of total scheduled time accrued to a task.
+ * <p>
+ * The priority within a level increases with the scheduled time accumulated
+ * in that level. This is necessary to achieve fairness when tasks acquire
+ * scheduled time at varying rates.
+ * <p>
+ * However, this priority is <b>not</b> equal to the task total accrued
+ * scheduled time. When a task graduates to a higher level, the level
+ * priority is set to the minimum current priority in the new level. This
+ * allows us to maintain instantaneous fairness in terms of scheduled time.
+ */
+class Priority {
+public:
+    Priority(int level, int64_t level_priority) : _level(level), _level_priority(level_priority) {}
+
+    int level() const { return _level; }
+    int64_t level_priority() const { return _level_priority; }
+
+    bool operator<(const Priority& other) const {
+        if (_level != other._level) return _level < other._level;
+        return _level_priority < other._level_priority;
+    }
+
+    std::string to_string() const {
+        std::ostringstream os;
+        os << "Priority(level=" << _level << ", priority=" << _level_priority << ")";
+        return os.str();
+    }
+
+private:
+    int _level;
+    int64_t _level_priority;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/split_concurrency_controller.h b/be/src/vec/exec/executor/time_sharing/split_concurrency_controller.h
new file mode 100644
index 0000000..3c803cc
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/split_concurrency_controller.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include <chrono>
+#include <cmath>
+#include <cstdint>
+#include <stdexcept>
+
+#include "common/factory_creator.h"
+#include "common/status.h"
+
+namespace doris::vectorized {
+
+class SplitConcurrencyController {
+    ENABLE_FACTORY_CREATOR(SplitConcurrencyController);
+
+public:
+    SplitConcurrencyController(int initial_concurrency,
+                               std::chrono::nanoseconds adjustment_interval)
+            : _adjustment_interval_nanos(adjustment_interval),
+              _target_concurrency(initial_concurrency) {}
+
+    void update(uint64_t nanos, double utilization, int current_concurrency) {
+        _validate_args(nanos, utilization, current_concurrency);
+        _thread_nanos_since_adjust += nanos;
+        if (_should_increase_concurrency(current_concurrency, utilization)) {
+            _reset_adjust_counter();
+            ++_target_concurrency;
+        }
+    }
+
+    int target_concurrency() const { return _target_concurrency; }
+
+    void split_finished(uint64_t split_nanos, double utilization, int current_concurrency) {
+        _validate_args(split_nanos, utilization, current_concurrency);
+        if (_should_adjust(split_nanos)) {
+            if (utilization > TARGET_UTIL && _target_concurrency > 1) {
+                _reset_adjust_counter();
+                --_target_concurrency;
+            } else if (utilization < TARGET_UTIL && current_concurrency >= _target_concurrency) {
+                _reset_adjust_counter();
+                ++_target_concurrency;
+            }
+        }
+    }
+
+private:
+    static constexpr double TARGET_UTIL = 0.5;
+    const std::chrono::nanoseconds _adjustment_interval_nanos;
+    int _target_concurrency;
+    uint64_t _thread_nanos_since_adjust = 0;
+
+    void _validate_args(uint64_t nanos, double util, int concurrency) const {
+        CHECK(nanos >= 0) << "Negative nanos";
+        CHECK(std::isfinite(util)) << "Invalid utilization";
+        CHECK(util >= 0) << "Negative utilization";
+        CHECK(concurrency >= 0) << "Negative concurrency";
+    }
+
+    bool _should_increase_concurrency(int curr_concurrency, double util) const {
+        return _thread_nanos_since_adjust >= _adjustment_interval_nanos.count() &&
+               util < TARGET_UTIL && curr_concurrency >= _target_concurrency;
+    }
+
+    bool _should_adjust(uint64_t split_nanos) const {
+        return _thread_nanos_since_adjust >= _adjustment_interval_nanos.count() ||
+               _thread_nanos_since_adjust >= split_nanos;
+    }
+
+    void _reset_adjust_counter() { _thread_nanos_since_adjust = 0; }
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
new file mode 100644
index 0000000..82907f1
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
@@ -0,0 +1,408 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/executor/time_sharing/time_sharing_task_executor.h"
+
+#include <functional>
+#include <iostream>
+
+#include "util/defer_op.h"
+#include "util/thread.h"
+#include "util/threadpool.h"
+#include "vec/exec/executor/time_sharing/time_sharing_task_handle.h"
+
+namespace doris {
+namespace vectorized {
+
+TimeSharingTaskExecutor::TimeSharingTaskExecutor(
+        int runner_threads, int min_concurrency, int guaranteed_concurrency_per_task,
+        int max_concurrency_per_task, std::shared_ptr<Ticker> ticker,
+        std::chrono::milliseconds stuck_split_warning_threshold,
+        std::shared_ptr<MultilevelSplitQueue> split_queue)
+        : _runner_threads(runner_threads),
+          _min_concurrency(min_concurrency),
+          _guaranteed_concurrency_per_task(guaranteed_concurrency_per_task),
+          _max_concurrency_per_task(max_concurrency_per_task),
+          _ticker(ticker != nullptr ? ticker : std::make_shared<SystemTicker>()),
+          _stuck_split_warning_threshold(stuck_split_warning_threshold),
+          _waiting_splits(split_queue != nullptr ? std::move(split_queue)
+                                                 : std::make_shared<MultilevelSplitQueue>(2)) {}
+
+Status TimeSharingTaskExecutor::init() {
+    ThreadPoolBuilder builder("SplitRunner");
+    builder.set_min_threads(_runner_threads).set_max_threads(_runner_threads * 2);
+    RETURN_IF_ERROR(builder.build(&_thread_pool));
+    return Status::OK();
+}
+
+TimeSharingTaskExecutor::~TimeSharingTaskExecutor() {
+    if (!_stopped.exchange(true)) {
+        stop();
+    }
+
+    std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
+    {
+        std::lock_guard<std::mutex> guard(_mutex);
+        for (auto& [task_id, handle] : _tasks) {
+            if (handle->is_closed()) {
+                LOG(WARNING) << "task is already destroyed, task_id: " << task_id.to_string();
+                continue;
+            }
+            auto task_splits = handle->close();
+            splits_to_destroy.insert(splits_to_destroy.end(),
+                                     std::make_move_iterator(task_splits.begin()),
+                                     std::make_move_iterator(task_splits.end()));
+            _record_leaf_splits_size(guard);
+        }
+        //_tasks.clear();
+        //_all_splits.clear();
+        //_intermediate_splits.clear();
+        //_blocked_splits.clear();
+        _waiting_splits->remove_all(splits_to_destroy);
+    }
+
+    if (splits_to_destroy.empty()) {
+        return;
+    }
+
+    for (auto& split : splits_to_destroy) {
+        split->close(Status::OK());
+    }
+
+    if (_thread_pool) {
+        _thread_pool->shutdown();
+    }
+}
+
+Status TimeSharingTaskExecutor::start() {
+    std::lock_guard<std::mutex> guard(_mutex);
+    for (int i = 0; i < _runner_threads; ++i) {
+        RETURN_IF_ERROR(_add_runner_thread());
+    }
+    return Status::OK();
+}
+
+void TimeSharingTaskExecutor::stop() {
+    _waiting_splits->interrupt();
+    {
+        std::lock_guard<std::mutex> guard(_mutex);
+        _stopped = true;
+    }
+    if (_thread_pool) {
+        _thread_pool->shutdown();
+    }
+}
+
+Status TimeSharingTaskExecutor::_add_runner_thread() {
+    return _thread_pool->submit_func([this]() {
+        Thread::set_self_name("SplitRunner");
+
+        while (!_stopped) {
+            std::shared_ptr<PrioritizedSplitRunner> split;
+            split = _waiting_splits->take();
+            if (!split) {
+                return;
+            }
+
+            //            ScopedRunnerTracker tracker(_mutex, _running_splits, split);
+            {
+                std::lock_guard<std::mutex> guard(_mutex);
+                _running_splits.insert(split);
+            }
+            Defer defer {[&]() {
+                std::lock_guard<std::mutex> guard(_mutex);
+                _running_splits.erase(split);
+            }};
+            Result<SharedListenableFuture<Void>> blocked_future_result = split->process();
+            if (!blocked_future_result.has_value()) {
+                return;
+            }
+            auto blocked_future = blocked_future_result.value();
+
+            if (split->is_finished()) {
+                _split_finished(split, split->finished_status());
+            } else {
+                std::lock_guard<std::mutex> guard(_mutex);
+                if (blocked_future.is_done()) {
+                    _waiting_splits->offer(split);
+                } else {
+                    _blocked_splits[split] = blocked_future;
+
+                    _blocked_splits[split].add_callback(
+                            [this, split](const Void& value, const Status& status) {
+                                if (status.ok()) {
+                                    std::lock_guard<std::mutex> guard(_mutex);
+                                    _blocked_splits.erase(split);
+                                    split->reset_level_priority();
+                                    _waiting_splits->offer(split);
+                                } else {
+                                    LOG(WARNING) << "blocked split is failed, split_id: "
+                                                 << split->split_id() << ", status: " << status;
+                                    _split_finished(split, status);
+                                }
+                            });
+                }
+            }
+        }
+    });
+}
+
+Result<std::shared_ptr<TaskHandle>> TimeSharingTaskExecutor::create_task(
+        const TaskId& task_id, std::function<double()> utilization_supplier,
+        int initial_split_concurrency, std::chrono::nanoseconds split_concurrency_adjust_frequency,
+        std::optional<int> max_concurrency_per_task) {
+    auto task_handle = std::make_shared<TimeSharingTaskHandle>(
+            task_id, _waiting_splits, utilization_supplier, initial_split_concurrency,
+            split_concurrency_adjust_frequency, max_concurrency_per_task);
+    RETURN_IF_ERROR_RESULT(task_handle->init());
+
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    _tasks[task_id] = task_handle;
+
+    return task_handle;
+}
+
+Status TimeSharingTaskExecutor::add_task(const TaskId& task_id,
+                                         std::shared_ptr<TaskHandle> task_handle) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    _tasks[task_id] =
+            std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskHandle>(task_handle);
+    return Status::OK();
+}
+
+Status TimeSharingTaskExecutor::remove_task(std::shared_ptr<TaskHandle> task_handle) {
+    auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+    std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
+
+    {
+        std::lock_guard<std::mutex> guard(_mutex);
+        auto it = _tasks.find(handle->task_id());
+        if (it == _tasks.end() || handle->is_closed()) {
+            return Status::OK();
+        }
+        _tasks.erase(it);
+
+        // Task is already closed
+        if (task_handle->is_closed()) {
+            return Status::OK();
+        }
+
+        splits_to_destroy = handle->close();
+
+        for (const auto& split : splits_to_destroy) {
+            _all_splits.erase(split);
+            _intermediate_splits.erase(split);
+            _blocked_splits.erase(split);
+        }
+        _waiting_splits->remove_all(splits_to_destroy);
+        _record_leaf_splits_size(guard);
+    }
+
+    // call destroy outside of synchronized block as it is expensive and doesn't need a lock on the task executor
+    for (auto& split : splits_to_destroy) {
+        split->close(Status::OK());
+    }
+
+    // record completed stats
+    int64_t thread_usage_nanos = handle->scheduled_nanos();
+    int level = MultilevelSplitQueue::compute_level(thread_usage_nanos);
+    _completed_tasks_per_level[level]++;
+
+    if (splits_to_destroy.empty()) {
+        return Status::OK();
+    }
+
+    // replace blocked splits that were terminated
+    {
+        std::lock_guard<std::mutex> guard(_mutex);
+        _add_new_entrants(guard);
+        _record_leaf_splits_size(guard);
+    }
+    return Status::OK();
+}
+
+Result<std::vector<SharedListenableFuture<Void>>> TimeSharingTaskExecutor::enqueue_splits(
+        std::shared_ptr<TaskHandle> task_handle, bool intermediate,
+        const std::vector<std::shared_ptr<SplitRunner>>& splits) {
+    std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
+    Defer defer {[&]() {
+        for (auto& split : splits_to_destroy) {
+            split->close(Status::OK());
+        }
+    }};
+    std::vector<SharedListenableFuture<Void>> finished_futures;
+    auto handle = std::dynamic_pointer_cast<TimeSharingTaskHandle>(task_handle);
+    {
+        std::lock_guard<std::mutex> guard(_mutex);
+        for (const auto& task_split : splits) {
+            TaskId task_id = handle->task_id();
+            int split_id = handle->next_split_id();
+
+            auto prioritized_split =
+                    PrioritizedSplitRunner::create_shared(handle, split_id, task_split, _ticker);
+            RETURN_IF_ERROR_RESULT(prioritized_split->init());
+            if (intermediate) {
+                if (handle->record_intermediate_split(prioritized_split)) {
+                    _start_intermediate_split(prioritized_split, guard);
+                } else {
+                    splits_to_destroy.push_back(prioritized_split);
+                }
+            } else {
+                if (handle->enqueue_split(prioritized_split)) {
+                    _schedule_task_if_necessary(handle, guard);
+                    _add_new_entrants(guard);
+                } else {
+                    splits_to_destroy.push_back(prioritized_split);
+                }
+            }
+            finished_futures.push_back(prioritized_split->finished_future());
+        }
+        _record_leaf_splits_size(guard);
+    }
+    return finished_futures;
+}
+
+void TimeSharingTaskExecutor::_split_finished(std::shared_ptr<PrioritizedSplitRunner> split,
+                                              const Status& status) {
+    _completed_splits_per_level[split->priority().level()]++;
+    {
+        std::lock_guard<std::mutex> guard(_mutex);
+        _all_splits.erase(split);
+
+        auto task_handle = split->task_handle();
+        task_handle->split_finished(split);
+
+        _schedule_task_if_necessary(task_handle, guard);
+
+        _add_new_entrants(guard);
+        _record_leaf_splits_size(guard);
+    }
+    // call close outside of synchronized block as it is expensive and doesn't need a lock on the task executor
+    split->close(status);
+}
+
+void TimeSharingTaskExecutor::_schedule_task_if_necessary(
+        std::shared_ptr<TimeSharingTaskHandle> task_handle, std::lock_guard<std::mutex>& guard) {
+    int guaranteed_concurrency = std::min(
+            _guaranteed_concurrency_per_task,
+            task_handle->max_concurrency_per_task().value_or(std::numeric_limits<int>::max()));
+    int splits_to_schedule = guaranteed_concurrency - task_handle->running_leaf_splits();
+
+    for (int i = 0; i < splits_to_schedule; ++i) {
+        auto split = task_handle->poll_next_split();
+        if (!split) return;
+
+        _start_split(split, guard);
+        auto elapsed_nanos = std::chrono::nanoseconds(
+                std::chrono::steady_clock::now().time_since_epoch().count() -
+                split->created_nanos());
+        _split_queued_time
+                << std::chrono::duration_cast<std::chrono::microseconds>(elapsed_nanos).count();
+    }
+    _record_leaf_splits_size(guard);
+}
+
+void TimeSharingTaskExecutor::_add_new_entrants(std::lock_guard<std::mutex>& guard) {
+    int running = _all_splits.size() - _intermediate_splits.size();
+    for (int i = 0; i < _min_concurrency - running; i++) {
+        auto split = _poll_next_split_worker(guard);
+        if (!split) {
+            break;
+        }
+
+        auto elapsed_nanos = std::chrono::nanoseconds(
+                std::chrono::steady_clock::now().time_since_epoch().count() -
+                split->created_nanos());
+        _split_queued_time
+                << std::chrono::duration_cast<std::chrono::microseconds>(elapsed_nanos).count();
+        _start_split(split, guard);
+    }
+}
+
+void TimeSharingTaskExecutor::_start_intermediate_split(
+        std::shared_ptr<PrioritizedSplitRunner> split, std::lock_guard<std::mutex>& guard) {
+    _start_split(split, guard);
+    _intermediate_splits.insert(split);
+}
+
+void TimeSharingTaskExecutor::_start_split(std::shared_ptr<PrioritizedSplitRunner> split,
+                                           std::lock_guard<std::mutex>& guard) {
+    _all_splits.insert(split);
+    _waiting_splits->offer(split);
+}
+
+std::shared_ptr<PrioritizedSplitRunner> TimeSharingTaskExecutor::_poll_next_split_worker(
+        std::lock_guard<std::mutex>& guard) {
+    // 遍历任务列表,找到第一个能产生分片的任务
+    // 然后将该任务移到列表末尾,实现轮询调度
+    for (auto it = _tasks.begin(); it != _tasks.end();) {
+        auto task = it->second;
+        // 跳过已经运行配置的最大驱动数的任务
+        if (task->running_leaf_splits() >=
+            task->max_concurrency_per_task().value_or(_max_concurrency_per_task)) {
+            ++it;
+            continue;
+        }
+
+        auto split = task->poll_next_split();
+        if (split) {
+            // 将任务移到列表末尾
+            auto task_copy = task;
+            auto task_id = it->first;
+            it = _tasks.erase(it);
+            _tasks[task_id] = task_copy;
+            return split;
+        }
+        ++it;
+    }
+    return nullptr;
+}
+
+void TimeSharingTaskExecutor::_record_leaf_splits_size(std::lock_guard<std::mutex>& guard) {
+    //    auto now = _ticker->read();
+    //    auto time_difference = now - _last_leaf_splits_size_record_time;
+    //
+    //    if (time_difference > 0) {
+    //        // 记录当前叶子分片的数量和持续时间
+    //        //        _leaf_splits_size.add(_last_leaf_splits_size, std::chrono::nanoseconds(time_difference));
+    //        _last_leaf_splits_size_record_time = now;
+    //    }
+    //    // always record new lastLeafSplitsSize as it might have changed
+    //    // even if timeDifference is 0
+    //    _last_leaf_splits_size = _all_splits.size() - _intermediate_splits.size();
+}
+
+void TimeSharingTaskExecutor::_interrupt() {
+    std::lock_guard<std::mutex> guard(_mutex);
+    _condition.notify_all();
+    _waiting_splits->interrupt();
+}
+
+int64_t TimeSharingTaskExecutor::_get_running_tasks_for_level(int level) const {
+    std::lock_guard<std::mutex> guard(_mutex);
+    int64_t count = 0;
+    for (const auto& [task_id, task] : _tasks) {
+        if (task->priority().level() == level) {
+            count++;
+        }
+    }
+    return count;
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
new file mode 100644
index 0000000..c3fe520
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
@@ -0,0 +1,208 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <queue>
+#include <thread>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "bvar/latency_recorder.h"
+#include "util/threadpool.h"
+#include "vec/exec/executor/listenable_future.h"
+#include "vec/exec/executor/stats.h"
+#include "vec/exec/executor/task_executor.h"
+#include "vec/exec/executor/ticker.h"
+#include "vec/exec/executor/time_sharing/multilevel_split_queue.h"
+#include "vec/exec/executor/time_sharing/prioritized_split_runner.h"
+
+namespace doris {
+namespace vectorized {
+
+/**
+ * ThreadSafe
+ */
+class TimeSharingTaskExecutor : public TaskExecutor {
+    ENABLE_FACTORY_CREATOR(TimeSharingTaskExecutor);
+
+public:
+    TimeSharingTaskExecutor(int threads, int min_concurrency, int guaranteed_concurrency_per_task,
+                            int max_concurrency_per_task, std::shared_ptr<Ticker> ticker,
+                            std::chrono::milliseconds stuck_split_warning_threshold =
+                                    std::chrono::milliseconds(60000),
+                            std::shared_ptr<MultilevelSplitQueue> split_queue = nullptr);
+
+    ~TimeSharingTaskExecutor() override;
+
+    Status init() override;
+
+    Status start() override;
+    void stop() override;
+
+    Result<std::shared_ptr<TaskHandle>> create_task(
+            const TaskId& task_id, std::function<double()> utilization_supplier,
+            int initial_split_concurrency,
+            std::chrono::nanoseconds split_concurrency_adjust_frequency,
+            std::optional<int> max_concurrency_per_task) override;
+
+    Status add_task(const TaskId& task_id, std::shared_ptr<TaskHandle> task_handle) override;
+
+    Status remove_task(std::shared_ptr<TaskHandle> task_handle) override;
+
+    Result<std::vector<SharedListenableFuture<Void>>> enqueue_splits(
+            std::shared_ptr<TaskHandle> task_handle, bool intermediate,
+            const std::vector<std::shared_ptr<SplitRunner>>& splits) override;
+
+    size_t waiting_splits_size() const { return _waiting_splits->size(); }
+
+    size_t intermediate_splits_size() const {
+        std::lock_guard<std::mutex> guard(_mutex);
+        return _intermediate_splits.size();
+    }
+
+    size_t running_splits_size() const { return _running_splits.size(); }
+
+    size_t blocked_splits_size() const { return _blocked_splits.size(); }
+
+    size_t total_splits_size() const {
+        std::lock_guard<std::mutex> guard(_mutex);
+        return _all_splits.size();
+    }
+
+    size_t tasks_size() const {
+        std::lock_guard<std::mutex> guard(_mutex);
+        return _tasks.size();
+    }
+
+    int64_t completed_tasks_level0() const { return _completed_tasks_per_level[0]; }
+
+    int64_t completed_tasks_level1() const { return _completed_tasks_per_level[1]; }
+
+    int64_t completed_tasks_level2() const { return _completed_tasks_per_level[2]; }
+
+    int64_t completed_tasks_level3() const { return _completed_tasks_per_level[3]; }
+
+    int64_t completed_tasks_level4() const { return _completed_tasks_per_level[4]; }
+
+    int64_t completed_splits_level0() const { return _completed_splits_per_level[0]; }
+
+    int64_t completed_splits_level1() const { return _completed_splits_per_level[1]; }
+
+    int64_t completed_splits_level2() const { return _completed_splits_per_level[2]; }
+
+    int64_t completed_splits_level3() const { return _completed_splits_per_level[3]; }
+
+    int64_t completed_splits_level4() const { return _completed_splits_per_level[4]; }
+
+    int64_t running_tasks_level0() const { return _get_running_tasks_for_level(0); }
+
+    int64_t running_tasks_level1() const { return _get_running_tasks_for_level(1); }
+
+    int64_t running_tasks_level2() const { return _get_running_tasks_for_level(2); }
+
+    int64_t running_tasks_level3() const { return _get_running_tasks_for_level(3); }
+
+    int64_t running_tasks_level4() const { return _get_running_tasks_for_level(4); }
+
+private:
+    class ScopedRunnerTracker {
+    public:
+        ScopedRunnerTracker(
+                std::mutex& mutex,
+                std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>>& running_splits,
+                std::shared_ptr<PrioritizedSplitRunner> split)
+                : _mutex(mutex), _running_splits(running_splits), _split(std::move(split)) {
+            std::lock_guard<std::mutex> guard(_mutex);
+            _running_splits.insert(_split);
+        }
+
+        ~ScopedRunnerTracker() {
+            std::lock_guard<std::mutex> guard(_mutex);
+            _running_splits.erase(_split);
+        }
+
+    private:
+        std::mutex& _mutex;
+        std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>>& _running_splits;
+        std::shared_ptr<PrioritizedSplitRunner> _split;
+    };
+
+    class TaskRunner {
+    public:
+        TaskRunner(TimeSharingTaskExecutor& executor);
+        void run();
+        void stop();
+
+    private:
+        TimeSharingTaskExecutor& _executor;
+        std::atomic<bool> _running {true};
+    };
+
+    Status _add_runner_thread();
+    void _schedule_task_if_necessary(std::shared_ptr<TimeSharingTaskHandle> task_handle,
+                                     std::lock_guard<std::mutex>& guard);
+    void _add_new_entrants(std::lock_guard<std::mutex>& guard);
+    void _start_intermediate_split(std::shared_ptr<PrioritizedSplitRunner> split,
+                                   std::lock_guard<std::mutex>& guard);
+    void _start_split(std::shared_ptr<PrioritizedSplitRunner> split,
+                      std::lock_guard<std::mutex>& guard);
+    std::shared_ptr<PrioritizedSplitRunner> _poll_next_split_worker(
+            std::lock_guard<std::mutex>& guard);
+    void _record_leaf_splits_size(std::lock_guard<std::mutex>& guard);
+    void _split_finished(std::shared_ptr<PrioritizedSplitRunner> split, const Status& status);
+    void _interrupt();
+
+    int64_t _get_running_tasks_for_level(int level) const;
+
+    std::unique_ptr<ThreadPool> _thread_pool;
+    const int _runner_threads;
+    const int _min_concurrency;
+    const int _guaranteed_concurrency_per_task;
+    const int _max_concurrency_per_task;
+    std::shared_ptr<Ticker> _ticker;
+    const std::chrono::milliseconds _stuck_split_warning_threshold;
+    std::shared_ptr<MultilevelSplitQueue> _waiting_splits;
+
+    mutable std::mutex _mutex;
+    std::condition_variable _condition;
+    std::atomic<bool> _stopped {false};
+
+    std::vector<std::unique_ptr<TaskRunner>> _task_runners;
+
+    std::unordered_map<TaskId, std::shared_ptr<TimeSharingTaskHandle>> _tasks;
+
+    std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>> _all_splits;
+    std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>> _intermediate_splits;
+    std::unordered_set<std::shared_ptr<PrioritizedSplitRunner>> _running_splits;
+    std::unordered_map<std::shared_ptr<PrioritizedSplitRunner>, SharedListenableFuture<Void>>
+            _blocked_splits;
+    std::array<std::atomic<int64_t>, 5> _completed_tasks_per_level = {0, 0, 0, 0, 0};
+    std::array<std::atomic<int64_t>, 5> _completed_splits_per_level = {0, 0, 0, 0, 0};
+
+    bvar::LatencyRecorder _split_queued_time;
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.cpp b/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.cpp
new file mode 100644
index 0000000..0979b72
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.cpp
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/executor/time_sharing/time_sharing_task_handle.h"
+
+#include "vec/exec/executor/time_sharing/multilevel_split_queue.h"
+
+namespace doris {
+namespace vectorized {
+
+TimeSharingTaskHandle::TimeSharingTaskHandle(
+        const TaskId& task_id, std::shared_ptr<MultilevelSplitQueue> split_queue,
+        std::function<double()> utilization_supplier, int initial_split_concurrency,
+        std::chrono::nanoseconds split_concurrency_adjust_frequency,
+        std::optional<int> max_concurrency_per_task)
+        : _task_id(task_id),
+          _split_queue(std::move(split_queue)),
+          _utilization_supplier(std::move(utilization_supplier)),
+          _max_concurrency_per_task(max_concurrency_per_task),
+          _concurrency_controller(initial_split_concurrency, split_concurrency_adjust_frequency) {}
+
+Status TimeSharingTaskHandle::init() {
+    return Status::OK();
+}
+
+//Priority TimeSharingTaskHandle::add_scheduled_nanos(int64_t duration_nanos) {
+//    std::lock_guard<std::mutex> lock(_mutex);
+//    _concurrency_controller.update(duration_nanos, _utilization_supplier(),
+//                                   _running_leaf_splits.size());
+//    _scheduled_nanos += duration_nanos;
+//
+//    Priority new_priority =
+//            _split_queue->update_priority(_priority.load(), duration_nanos, _scheduled_nanos);
+//
+//    _priority.store(new_priority);
+//    return new_priority;
+//}
+
+Priority TimeSharingTaskHandle::add_scheduled_nanos(int64_t duration_nanos) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    _concurrency_controller.update(duration_nanos, _utilization_supplier(),
+                                   _running_leaf_splits.size());
+    _scheduled_nanos += duration_nanos;
+
+    Priority new_priority =
+            _split_queue->update_priority(_priority, duration_nanos, _scheduled_nanos);
+
+    _priority = new_priority;
+    return new_priority;
+}
+
+//Priority TimeSharingTaskHandle::reset_level_priority() {
+//    std::lock_guard<std::mutex> lock(_mutex);
+//
+//    Priority current_priority = _priority.load();
+//    int64_t level_min_priority =
+//            _split_queue->get_level_min_priority(current_priority.level(), _scheduled_nanos);
+//
+//    if (current_priority.level_priority() < level_min_priority) {
+//        Priority new_priority(current_priority.level(), level_min_priority);
+//        _priority.store(new_priority);
+//        return new_priority;
+//    }
+//
+//    return current_priority;
+//}
+
+Priority TimeSharingTaskHandle::reset_level_priority() {
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    Priority current_priority = _priority;
+    int64_t level_min_priority =
+            _split_queue->get_level_min_priority(current_priority.level(), _scheduled_nanos);
+
+    if (current_priority.level_priority() < level_min_priority) {
+        Priority new_priority(current_priority.level(), level_min_priority);
+        _priority = new_priority;
+        return new_priority;
+    }
+
+    return current_priority;
+}
+
+bool TimeSharingTaskHandle::is_closed() const {
+    return _closed.load();
+}
+
+//Priority TimeSharingTaskHandle::priority() const {
+//    return _priority.load();
+//}
+
+Priority TimeSharingTaskHandle::priority() const {
+    std::lock_guard<std::mutex> lock(_mutex);
+    return _priority;
+}
+
+TaskId TimeSharingTaskHandle::task_id() const {
+    return _task_id;
+}
+
+std::optional<int> TimeSharingTaskHandle::max_concurrency_per_task() const {
+    return _max_concurrency_per_task;
+}
+
+std::vector<std::shared_ptr<PrioritizedSplitRunner>> TimeSharingTaskHandle::close() {
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    if (_closed.exchange(true)) {
+        return {};
+    }
+
+    std::vector<std::shared_ptr<PrioritizedSplitRunner>> result;
+    result.reserve(_running_intermediate_splits.size() + _running_leaf_splits.size() +
+                   _queued_leaf_splits.size());
+
+    result.insert(result.end(), _running_intermediate_splits.begin(),
+                  _running_intermediate_splits.end());
+    result.insert(result.end(), _running_leaf_splits.begin(), _running_leaf_splits.end());
+
+    while (!_queued_leaf_splits.empty()) {
+        result.push_back(_queued_leaf_splits.front());
+        _queued_leaf_splits.pop();
+    }
+
+    _running_intermediate_splits.clear();
+    _running_leaf_splits.clear();
+
+    return result;
+}
+
+bool TimeSharingTaskHandle::enqueue_split(std::shared_ptr<PrioritizedSplitRunner> split) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    if (_closed) {
+        return false;
+    }
+    _queued_leaf_splits.emplace(std::move(split));
+    return true;
+}
+
+bool TimeSharingTaskHandle::record_intermediate_split(
+        std::shared_ptr<PrioritizedSplitRunner> split) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    if (_closed) {
+        return false;
+    }
+    _running_intermediate_splits.emplace_back(std::move(split));
+    return true;
+}
+
+int TimeSharingTaskHandle::running_leaf_splits() const {
+    std::lock_guard<std::mutex> lock(_mutex);
+    return _running_leaf_splits.size();
+}
+
+int64_t TimeSharingTaskHandle::scheduled_nanos() const {
+    std::lock_guard<std::mutex> lock(_mutex);
+    return _scheduled_nanos;
+}
+
+std::shared_ptr<PrioritizedSplitRunner> TimeSharingTaskHandle::poll_next_split() {
+    std::lock_guard<std::mutex> lock(_mutex);
+
+    if (_closed) {
+        return nullptr;
+    }
+
+    if (_running_leaf_splits.size() >= _concurrency_controller.target_concurrency()) {
+        return nullptr;
+    }
+
+    if (_queued_leaf_splits.empty()) {
+        return nullptr;
+    }
+
+    auto split = _queued_leaf_splits.front();
+    _queued_leaf_splits.pop();
+    _running_leaf_splits.push_back(split);
+    return split;
+}
+
+void TimeSharingTaskHandle::split_finished(std::shared_ptr<PrioritizedSplitRunner> split) {
+    std::lock_guard<std::mutex> lock(_mutex);
+    _concurrency_controller.split_finished(split->scheduled_nanos(), _utilization_supplier(),
+                                           _running_leaf_splits.size());
+
+    auto it = std::find(_running_intermediate_splits.begin(), _running_intermediate_splits.end(),
+                        split);
+    if (it != _running_intermediate_splits.end()) {
+        _running_intermediate_splits.erase(it);
+    }
+
+    it = std::find(_running_leaf_splits.begin(), _running_leaf_splits.end(), split);
+    if (it != _running_leaf_splits.end()) {
+        _running_leaf_splits.erase(it);
+    }
+}
+
+int TimeSharingTaskHandle::next_split_id() {
+    return _next_split_id.fetch_add(1);
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.h b/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.h
new file mode 100644
index 0000000..3373a68
--- /dev/null
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_handle.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <queue>
+#include <vector>
+
+#include "vec/exec/executor/task_handle.h"
+#include "vec/exec/executor/task_id.h"
+#include "vec/exec/executor/time_sharing/prioritized_split_runner.h"
+#include "vec/exec/executor/time_sharing/priority.h"
+#include "vec/exec/executor/time_sharing/split_concurrency_controller.h"
+
+namespace doris {
+namespace vectorized {
+
+class MultilevelSplitQueue;
+
+class TimeSharingTaskHandle : public TaskHandle {
+    ENABLE_FACTORY_CREATOR(TimeSharingTaskHandle);
+
+public:
+    TimeSharingTaskHandle(const TaskId& task_id, std::shared_ptr<MultilevelSplitQueue> split_queue,
+                          std::function<double()> utilization_supplier,
+                          int initial_split_concurrency,
+                          std::chrono::nanoseconds split_concurrency_adjust_frequency,
+                          std::optional<int> max_concurrency_per_task);
+
+    Status init() override;
+
+    Priority add_scheduled_nanos(int64_t duration_nanos);
+    Priority reset_level_priority();
+    bool is_closed() const override;
+    Priority priority() const;
+    TaskId task_id() const override;
+    std::optional<int> max_concurrency_per_task() const;
+    std::vector<std::shared_ptr<PrioritizedSplitRunner>> close();
+    bool enqueue_split(std::shared_ptr<PrioritizedSplitRunner> split);
+    bool record_intermediate_split(std::shared_ptr<PrioritizedSplitRunner> split);
+    int running_leaf_splits() const;
+    int64_t scheduled_nanos() const;
+    std::shared_ptr<PrioritizedSplitRunner> poll_next_split();
+    void split_finished(std::shared_ptr<PrioritizedSplitRunner> split);
+    int next_split_id();
+
+private:
+    mutable std::mutex _mutex;
+    std::atomic<bool> _closed {false};
+    const TaskId _task_id;
+    std::shared_ptr<MultilevelSplitQueue> _split_queue;
+    std::function<double()> _utilization_supplier;
+    std::optional<int> _max_concurrency_per_task;
+    SplitConcurrencyController _concurrency_controller;
+
+    std::queue<std::shared_ptr<PrioritizedSplitRunner>> _queued_leaf_splits;
+    std::vector<std::shared_ptr<PrioritizedSplitRunner>> _running_leaf_splits;
+    std::vector<std::shared_ptr<PrioritizedSplitRunner>> _running_intermediate_splits;
+    int64_t _scheduled_nanos {0};
+    std::atomic<int> _next_split_id {0};
+    //    std::atomic<Priority> _priority {Priority(0, 0)};
+    Priority _priority {0, 0};
+};
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/test/vec/exec/executor/time_sharing/multilevel_split_queue_test.cpp b/be/test/vec/exec/executor/time_sharing/multilevel_split_queue_test.cpp
new file mode 100644
index 0000000..cef23ff
--- /dev/null
+++ b/be/test/vec/exec/executor/time_sharing/multilevel_split_queue_test.cpp
@@ -0,0 +1,260 @@
+//// Licensed to the Apache Software Foundation (ASF) under one
+//// or more contributor license agreements.  See the NOTICE file
+//// distributed with this work for additional information
+//// regarding copyright ownership.  The ASF licenses this file
+//// to you under the Apache License, Version 2.0 (the
+//// "License"); you may not use this file except in compliance
+//// with the License.  You may obtain a copy of the License at
+////
+////   http://www.apache.org/licenses/LICENSE-2.0
+////
+//// Unless required by applicable law or agreed to in writing,
+//// software distributed under the License is distributed on an
+//// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+//// KIND, either express or implied.  See the License for the
+//// specific language governing permissions and limitations
+//// under the License.
+//
+//#include <gtest/gtest.h>
+//#include <atomic>
+//#include <thread>
+//#include "vec/exec/scan/MultilevelSplitQueue.h"
+//
+//namespace doris::vectorized {
+//
+//// 测试用SplitRunner实现
+//class TestSplitRunner : public PrioritizedSplitRunner {
+//public:
+//    TestSplitRunner(Priority priority)
+//        : PrioritizedSplitRunner(_global_cpu_time_micros, _global_scheduled_time_micros, _blocked_quanta_wall_time, _unblocked_quanta_wall_time), _priority(priority) {}
+//
+//    Priority getPriority() const override { return _priority; }
+//    void updatePriority(Priority new_pri) { _priority = new_pri; }
+//
+//private:
+//    Priority _priority;
+//    CounterStats _global_cpu_time_micros;
+//    CounterStats _global_scheduled_time_micros;
+//
+//    TimeStats _blocked_quanta_wall_time;
+//    TimeStats _unblocked_quanta_wall_time;
+//};
+//
+//class MultilevelSplitQueueTest : public testing::Test {
+//protected:
+//    void SetUp() override {
+//        queue = std::make_unique<MultilevelSplitQueue>(2.0);
+//    }
+//
+//    std::shared_ptr<TestSplitRunner> make_split(int level, int64_t pri) {
+//        return std::make_shared<TestSplitRunner>(Priority(level, pri));
+//    }
+//
+//    std::unique_ptr<MultilevelSplitQueue> queue;
+//};
+//
+//TEST_F(MultilevelSplitQueueTest, BasicFunction) {
+//    // 验证空队列
+//    EXPECT_EQ(queue->size(), 0);
+//
+//    // 添加任务
+//    auto split = make_split(0, 100);
+//    queue->addSplit(split);
+//    EXPECT_EQ(queue->size(), 1);
+//
+//    // 取出任务
+//    auto task = queue->pollSplit();
+//    ASSERT_NE(task, nullptr);
+//    EXPECT_EQ(task->getPriority().getLevel(), 0);
+//    EXPECT_EQ(queue->size(), 0);
+//}
+//
+//TEST_F(MultilevelSplitQueueTest, LevelContributionCap) {
+//    // 创建两个测试任务
+//    auto split0 = make_split(0, 0);
+//    auto split1 = make_split(0, 0);
+//
+//    // 获取级别阈值数组(假设实现中有这个静态成员)
+//    const auto& levelThresholds = MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS;
+//
+//    int64_t scheduledNanos = 0;
+//    for (size_t i = 0; i < levelThresholds.size() - 1; ++i) {
+//        // 计算需要增加的时间量(秒转纳秒)
+//        const int64_t levelAdvance =
+//                (levelThresholds[i + 1] - levelThresholds[i]) * 1'000'000'000LL;
+//        // 模拟任务累积时间
+//        scheduledNanos += levelAdvance;
+//        split0->updatePriority(queue->updatePriority(
+//                split0->getPriority(), levelAdvance, scheduledNanos));
+//        split1->updatePriority(queue->updatePriority(
+//                split1->getPriority(), levelAdvance, scheduledNanos));
+//
+//        // 验证级别提升
+//        EXPECT_EQ(split0->getPriority().getLevel(), i + 1);
+//        EXPECT_EQ(split1->getPriority().getLevel(), i + 1);
+//
+//        // 验证级别累计时间(应用贡献上限)
+//        const int64_t expected =
+//                2 * std::min(levelAdvance, MultilevelSplitQueue::LEVEL_CONTRIBUTION_CAP);
+//        EXPECT_EQ(queue->get_level_scheduled_time(i), expected);
+//        EXPECT_EQ(queue->get_level_scheduled_time(i + 1), 0);
+//    }
+//}
+//
+//TEST_F(MultilevelSplitQueueTest, LevelContributionWithCap) {
+//    // 创建队列(levelTimeMultiplier=2)
+//    MultilevelSplitQueue queue(2.0);
+//
+//    // 创建测试任务并初始化优先级
+//    auto split = make_split(0, 0);
+//    queue.addSplit(split);
+//
+//    // 模拟运行10分钟(转换为纳秒)
+//    constexpr int64_t quantaNanos = 10LL * 60 * 1'000'000'000; // 10分钟
+//    int64_t scheduledNanos = 0;
+//
+//    // 更新优先级触发时间分配
+//    auto task = std::static_pointer_cast<TestSplitRunner>(queue.pollSplit());
+//    scheduledNanos += quantaNanos;
+//    Priority new_pri = queue.updatePriority(
+//            task->getPriority(),
+//            quantaNanos,
+//            scheduledNanos
+//    );
+//    task->updatePriority(new_pri);
+//
+//    // 计算受限制的时间贡献
+//    int64_t cappedNanos = std::min(quantaNanos, MultilevelSplitQueue::LEVEL_CONTRIBUTION_CAP);
+//    const auto& levels = MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS;
+//
+//    // 验证每个级别的时间分配
+//    for (size_t i = 0; i < levels.size() - 1; ++i) {
+//        // 计算当前级别时间阈值(转换为纳秒)
+//        int64_t levelThreshold = static_cast<int64_t>((levels[i + 1] - levels[i])) * 1'000'000'000L;
+//        int64_t expected = std::min(levelThreshold, cappedNanos);
+//
+//        // 验证当前级别累计时间
+//        EXPECT_EQ(queue.get_level_scheduled_time(i), expected);
+//        cappedNanos -= expected;
+//    }
+//
+//    // 验证剩余时间分配到最后一个级别
+////    EXPECT_EQ(queue.get_level_scheduled_time(levels.size() - 1), cappedNanos);
+//}
+//
+//TEST_F(MultilevelSplitQueueTest, TimeCompensation) {
+//    // 初始状态验证
+//    EXPECT_EQ(queue->get_level_scheduled_time(2), 0);
+//
+//    int64_t scheduledNanos = 0L;
+//
+//    // 1. 创建并添加任务
+//    auto split = make_split(2, 200);
+//    queue->addSplit(split);
+//
+//    // 2. 取出任务(触发时间补偿)
+//    auto task = std::static_pointer_cast<TestSplitRunner>(queue->pollSplit());
+//    ASSERT_NE(task, nullptr);
+//
+//    // 3. 模拟任务运行5秒并更新优先级
+//    const int64_t RUN_TIME = 5'000'000'000; // 5秒
+//    scheduledNanos += RUN_TIME;
+//    Priority new_pri = queue->updatePriority(
+//        task->getPriority(),
+//        RUN_TIME,
+//        scheduledNanos
+//    );
+//    task->updatePriority(new_pri);
+//
+//    // 4. 重新加入队列(此时level2已有时间记录)
+//    queue->addSplit(task);
+//
+//    // 5. 再次添加新任务触发补偿
+//    auto new_split = make_split(2, 300);
+//    queue->addSplit(new_split);
+//
+//    // 验证补偿逻辑
+//    int64_t level2_time = queue->get_level_scheduled_time(2);
+//    int64_t level0_time = queue->get_level0_target_time();
+//
+//    // 预期补偿 = (level0_time / 4) + 5秒
+//    int64_t expected = (level0_time / 4) + RUN_TIME;
+//
+//    // 允许10%误差
+//    EXPECT_GT(level2_time, expected - expected/10);
+//    EXPECT_LT(level2_time, expected + expected/10);
+//}
+//
+//TEST_F(MultilevelSplitQueueTest, PriorityUpdate) {
+//    auto split = make_split(0, 0);
+//    queue->addSplit(split);
+//
+//    // 模拟运行30秒
+//    auto split_ptr = std::static_pointer_cast<TestSplitRunner>(queue->pollSplit());
+//    ASSERT_NE(split_ptr, nullptr);
+//    Priority new_pri = queue->updatePriority(
+//        split_ptr->getPriority(),
+//        30'000'000'000LL,  // 30秒
+//        0
+//    );
+//
+//    // 验证升级到level3
+//    EXPECT_EQ(new_pri.getLevel(), 3);
+//    EXPECT_GT(new_pri.getLevelPriority(), 30'000'000'000LL);
+//}
+//
+//TEST_F(MultilevelSplitQueueTest, LevelSelection) {
+//    // 准备不同级别的任务
+//    queue->addSplit(make_split(0, 100)); // level0
+//    queue->addSplit(make_split(3, 50));  // level3
+//
+//    // 多次轮询触发时间补偿
+//    for (int i = 0; i < 3; ++i) {
+//        auto task = queue->pollSplit();
+//        queue->addSplit(task);
+//    }
+//
+//    // 应优先选择level3
+//    auto task = queue->pollSplit();
+//    EXPECT_EQ(task->getPriority().getLevel(), 3);
+//}
+//
+//TEST_F(MultilevelSplitQueueTest, Concurrency) {
+//    constexpr int THREAD_NUM = 4;
+//    constexpr int OPS = 1000;
+//    std::vector<std::thread> workers;
+//
+//    for (int i = 0; i < THREAD_NUM; ++i) {
+//        workers.emplace_back([&] {
+//            for (int j = 0; j < OPS; ++j) {
+//                auto split = make_split(j % 5, j);
+//                queue->addSplit(split);
+//                if (auto task = queue->pollSplit()) {
+//                    // 模拟处理任务
+//                    std::this_thread::sleep_for(std::chrono::microseconds(10));
+//                }
+//            }
+//        });
+//    }
+//
+//    for (auto& t : workers) t.join();
+//    SUCCEED(); // 验证无崩溃
+//}
+//
+//TEST_F(MultilevelSplitQueueTest, Interrupt) {
+//    std::atomic<bool> stopped{false};
+//    std::thread consumer([&] {
+//        while (!stopped) {
+//            if (auto task = queue->pollSplit()) {
+//                // process task
+//            }
+//        }
+//    });
+//
+//    std::this_thread::sleep_for(std::chrono::milliseconds(10));
+//    queue->interrupt();
+//    stopped = true;
+//    consumer.join();
+//}
+//
+//} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/test/vec/exec/executor/time_sharing/split_concurrency_controller_test.cpp b/be/test/vec/exec/executor/time_sharing/split_concurrency_controller_test.cpp
new file mode 100644
index 0000000..58868a9
--- /dev/null
+++ b/be/test/vec/exec/executor/time_sharing/split_concurrency_controller_test.cpp
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/executor/time_sharing/split_concurrency_controller.h"
+
+#include <gtest/gtest.h>
+
+#include <chrono>
+
+namespace doris::vectorized {
+
+using namespace std::chrono_literals;
+
+class SplitConcurrencyControllerTest : public testing::Test {};
+
+TEST_F(SplitConcurrencyControllerTest, test_ramp_up) {
+    SplitConcurrencyController controller(1, std::chrono::seconds(1));
+
+    for (int i = 0; i < 10; ++i) {
+        controller.update(2'000'000'000, 0.0, i + 1);
+        EXPECT_EQ(controller.target_concurrency(), i + 2) << "Rampup failed at iteration " << i;
+    }
+}
+
+TEST_F(SplitConcurrencyControllerTest, test_ramp_down) {
+    SplitConcurrencyController controller(10, std::chrono::seconds(1));
+
+    for (int i = 0; i < 9; ++i) {
+        controller.update(2'000'000'000, 1.0, 10 - i);
+        controller.split_finished(30'000'000'000, 1.0, 10 - i);
+        EXPECT_EQ(controller.target_concurrency(), 10 - i - 1)
+                << "Rampdown failed at iteration " << i;
+    }
+}
+
+TEST_F(SplitConcurrencyControllerTest, test_rapid_adjust_for_quick_splits) {
+    SplitConcurrencyController controller(10, std::chrono::seconds(1));
+
+    // 测试快速递减
+    for (int i = 0; i < 9; ++i) {
+        controller.update(200'000'000, 1.0, 10 - i);
+        controller.split_finished(100'000'000, 1.0, 10 - i);
+        EXPECT_EQ(controller.target_concurrency(), 10 - i - 1)
+                << "Rapid decrease failed at iteration " << i;
+    }
+
+    // 重置状态后测试递增
+    controller.update(30'000'000'000, 0.0, 1);
+    for (int i = 0; i < 10; ++i) {
+        controller.update(200'000'000, 0.0, i + 1);
+        controller.split_finished(100'000'000, 0.0, i + 1);
+        EXPECT_EQ(controller.target_concurrency(), i + 2)
+                << "Rapid increase failed at iteration " << i;
+    }
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/test/vec/exec/executor/time_sharing/time_sharing_task_executor_test.cpp b/be/test/vec/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
new file mode 100644
index 0000000..104c626
--- /dev/null
+++ b/be/test/vec/exec/executor/time_sharing/time_sharing_task_executor_test.cpp
@@ -0,0 +1,1238 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/executor/time_sharing/time_sharing_task_executor.h"
+
+#include <gtest/gtest.h>
+
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <random>
+#include <thread>
+
+#include "vec/exec/executor/ticker.h"
+#include "vec/exec/executor/time_sharing/time_sharing_task_handle.h"
+
+namespace doris {
+namespace vectorized {
+
+class PhaseController {
+public:
+    explicit PhaseController(int parties) : _parties(parties), _count(parties), _generation(0) {}
+
+    int arrive_and_await() {
+        std::unique_lock<std::mutex> lock(_mutex);
+        int current_generation = _generation;
+        const int arrived_count = --_count;
+
+        if (arrived_count == 0) {
+            // 重置计数器并进入下一代
+            _count = _parties;
+            _generation++;
+            _cv.notify_all();
+        } else {
+            // 等待直到进入下一代
+            _cv.wait(lock, [this, current_generation] {
+                return _generation > current_generation || _count == 0;
+            });
+        }
+        return _generation;
+    }
+
+    void arrive_and_deregister() {
+        std::unique_lock<std::mutex> lock(_mutex);
+        _parties--; // 永久减少参与方数量
+        const int arrived_count = --_count;
+
+        //        fprintf(stderr, "[PhaseController] 注销参与者,剩余参与方=%d\n", _parties);
+
+        if (arrived_count == 0) {
+            // 立即触发代际更新
+            _count = _parties;
+            _generation++;
+            _cv.notify_all();
+        } else {
+            // 仅记录当前代际,不阻塞等待
+            _cv.notify_all(); // 唤醒可能存在的等待线程
+        }
+    }
+
+    void register_party() {
+        std::lock_guard<std::mutex> lock(_mutex);
+        _parties++;
+        _count++;         // 立即生效新参与者
+        _cv.notify_all(); // 唤醒可能存在的等待线程
+        fprintf(stderr, "[PhaseController] 注册新参与者,当前总参与方=%d\n", _parties);
+    }
+
+    int parties() const {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _parties;
+    }
+
+    int count() const {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _count;
+    }
+
+    int generation() const {
+        std::lock_guard<std::mutex> lock(_mutex);
+        return _generation;
+    }
+
+private:
+    mutable std::mutex _mutex;
+    std::condition_variable _cv;
+    int _parties;
+    int _count;
+    int _generation; // 新增代际计数器
+};
+
+// 线程安全的测试用Ticker
+class TestingTicker final : public Ticker {
+public:
+    TestingTicker() : time_(0) {}
+
+    int64_t read() const override { return time_.load(std::memory_order_relaxed); }
+
+    auto now() const { return time_.load(); }
+
+    void increment(int64_t delta, std::chrono::nanoseconds unit) {
+        if (delta < 0) {
+            throw std::invalid_argument("delta is negative");
+        }
+
+        std::lock_guard<std::mutex> lock(mutex_);
+        time_ += unit.count() * delta;
+    }
+
+private:
+    std::atomic<int64_t> time_;
+    mutable std::mutex mutex_;
+};
+
+class TestingSplitRunner : public SplitRunner {
+public:
+    TestingSplitRunner(std::string name, std::shared_ptr<TestingTicker> ticker,
+                       std::shared_ptr<PhaseController> global_controller,
+                       std::shared_ptr<PhaseController> begin_controller,
+                       std::shared_ptr<PhaseController> end_controller, int required_phases,
+                       int quanta_time_ms)
+            : _name(std::move(name)),
+              _ticker(ticker),
+              _global_controller(global_controller),
+              _begin_controller(begin_controller),
+              _end_controller(end_controller),
+              _required_phases(required_phases),
+              _quanta_time(quanta_time_ms),
+              _completed_phases(0),
+              _first_phase(-1),
+              _last_phase(-1),
+              _started(false) {
+        //              _completion_future(_completion_promise.get_future()) {
+        _begin_controller->register_party();
+        _end_controller->register_party();
+
+        if (_global_controller->parties() == 0) {
+            _global_controller->register_party();
+        }
+    }
+
+    Status init() override { return Status::OK(); }
+
+    Result<SharedListenableFuture<Void>> process_for(std::chrono::nanoseconds) override {
+        //        fprintf(stderr, "[%zu] TestingSplitRunner::process_for()\n",
+        //                std::hash<std::thread::id> {}(std::this_thread::get_id()));
+        _started = true;
+        _ticker->increment(_quanta_time, std::chrono::milliseconds(1));
+        // 创建新的promise/future对
+        std::promise<void> phase_promise;
+        std::future<void> phase_future = phase_promise.get_future();
+
+        // Global phase synchronization
+        //        fprintf(stderr,
+        //                "[%zu] TestingSplitRunner::process_for() _global_controller.arrive_and_await(),"
+        //                " _global_controller.count(): %d, _global_controller.parties(): %d;\n",
+        //                std::hash<std::thread::id> {}(std::this_thread::get_id()),
+        //                _global_controller->count(), _global_controller->parties());
+        _global_controller->arrive_and_await();
+        //        fprintf(stderr,
+        //                "[%zu] TestingSplitRunner::process_for() _global_controller.arrive_and_await() "
+        //                "finished;\n",
+        //                std::hash<std::thread::id> {}(std::this_thread::get_id()));
+
+        // Begin quanta phase
+        //        static_cast<void>(_begin_controller);
+        int generation = _begin_controller->arrive_and_await();
+        int expected = -1;
+        _first_phase.compare_exchange_weak(expected, generation - 1, std::memory_order_relaxed);
+        _last_phase.store(generation, std::memory_order_relaxed);
+
+        //        fprintf(stderr,
+        //                "[%zu] TestingSplitRunner::process_for() _end_controller.arrive_and_await(),"
+        //                " _end_controller.count(): %d, _end_controller.parties(): %d;\n",
+        //                std::hash<std::thread::id> {}(std::this_thread::get_id()), _end_controller->count(),
+        //                _end_controller->parties());
+
+        // End quanta phase
+        _end_controller->arrive_and_await();
+
+        //        fprintf(stderr,
+        //                "[%zu] TestingSplitRunner::process_for() _end_controller.arrive_and_await() "
+        //                "finished;\n",
+        //                std::hash<std::thread::id> {}(std::this_thread::get_id()));
+
+        if (++_completed_phases >= _required_phases) {
+            //            fprintf(stderr,
+            //                    "[%zu] TestingSplitRunner::process_for() all completed: completed_phases=%d, "
+            //                    "required=%d, global_count=%d, global_parties=%d\n",
+            //                    std::hash<std::thread::id> {}(std::this_thread::get_id()),
+            //                    _completed_phases.load(), _required_phases, _global_controller->count(),
+            //                    _global_controller->parties());
+            _end_controller->arrive_and_deregister();
+            _begin_controller->arrive_and_deregister();
+            _global_controller->arrive_and_deregister();
+            // 仅当任务完成时设置主promise
+            _completion_future.set_value(Void {});
+        }
+
+        //        fprintf(stderr,
+        //                "[%zu] TestingSplitRunner::process_for() _completed_phases: %d,"
+        //                " _required_phases: %d, _global_controller.count(): %d, "
+        //                "_global_controller.parties(): %d;\n",
+        //                std::hash<std::thread::id> {}(std::this_thread::get_id()), _completed_phases.load(),
+        //                _required_phases, _global_controller->count(), _global_controller->parties());
+
+        // 立即完成当前阶段的promise
+        //        phase_promise.set_value();
+
+        return SharedListenableFuture<Void>::create_ready();
+    }
+
+    int completed_phases() const { return _completed_phases; }
+
+    int first_phase() const { return _first_phase; }
+
+    int last_phase() const { return _last_phase; }
+
+    void close(const Status& status) override {
+        // Implementation needed
+    }
+
+    std::string get_info() const override {
+        // Implementation needed
+        return "";
+    }
+
+    bool is_finished() override {
+        //        return _completion_future.wait_for(std::chrono::seconds(0)) == std::future_status::ready;
+        return _completion_future.is_ready();
+    }
+
+    Status finished_status() override { return _completion_future.get_status(); }
+
+    bool is_started() const { return _started.load(); }
+
+private:
+    std::string _name;
+    std::shared_ptr<TestingTicker> _ticker;
+    std::shared_ptr<PhaseController> _global_controller;
+    std::shared_ptr<PhaseController> _begin_controller;
+    std::shared_ptr<PhaseController> _end_controller;
+    int _required_phases;
+    int _quanta_time;
+
+    std::atomic<int> _completed_phases;
+    std::atomic<int> _first_phase;
+    std::atomic<int> _last_phase;
+    std::atomic<bool> _started;
+    //    std::promise<void> _completion_promise;      // 用于整体完成通知
+    //    std::shared_future<void> _completion_future; // 改为共享future
+    ListenableFuture<Void> _completion_future {};
+};
+
+class TimeSharingTaskExecutorTest : public testing::Test {
+protected:
+    void SetUp() override {}
+
+    void TearDown() override {}
+
+private:
+    // 修改为模板函数以支持不同容器类型
+    template <typename Container>
+    void assert_split_states(int end_index, const Container& splits) {
+        // 打印所有split的启动状态
+        fprintf(stderr, "[验证] 所有split启动状态总览(共%zu个):\n", splits.size());
+        for (int i = 0; i < splits.size(); ++i) {
+            fprintf(stderr, "  → split[%d] 启动状态: %s\n", i,
+                    splits[i]->is_started() ? "true" : "false");
+        }
+
+        // 验证end_index之前的split都已启动
+        fprintf(stderr, "\n[验证] 详细检查0-%d号split应已启动:\n", end_index);
+        for (int i = 0; i <= end_index; ++i) {
+            bool actual = splits[i]->is_started();
+            fprintf(stderr, "  → split[%d] 预期启动=true,实际=%s\n", i, actual ? "true" : "false");
+            EXPECT_TRUE(actual) << "Split " << i << " 应该已启动";
+        }
+
+        // 验证end_index之后的split未启动
+        fprintf(stderr, "\n[验证] 详细检查%d-%zu号split应未启动:\n", end_index + 1,
+                splits.size() - 1);
+        for (int i = end_index + 1; i < splits.size(); ++i) {
+            bool actual = splits[i]->is_started();
+            fprintf(stderr, "  → split[%d] 预期启动=false,实际=%s\n", i,
+                    actual ? "true" : "false");
+            EXPECT_FALSE(actual) << "Split " << i << " 应该未启动";
+        }
+
+        fprintf(stderr, "[验证] 完整状态验证完成\n\n");
+    }
+
+    std::mt19937_64 _rng {std::random_device {}()};
+};
+
+// 辅助函数创建测试任务
+//std::shared_ptr<TestingSplitRunner> create_testing_split(
+//        std::shared_ptr<TestingTicker> ticker,
+//        std::shared_ptr<PhaseController> begin,
+//        std::shared_ptr<PhaseController> end,
+//        int required_phases) {
+//    return std::make_shared<TestingSplitRunner>(
+//            "driver", ticker,
+//            std::make_shared<PhaseController>(0), // global_controller
+//            begin,
+//            end,
+//            required_phases,
+//            0);
+//}
+
+// 修改原有测试用例(示例)
+TEST_F(TimeSharingTaskExecutorTest, TestTasksComplete) {
+    auto ticker = std::make_shared<TestingTicker>();
+    //    const auto stuck_threshold = std::chrono::minutes(10);
+
+    TimeSharingTaskExecutor executor(4, 8, 3, 4, ticker);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        ticker->increment(20, std::chrono::milliseconds(1));
+        TaskId task_id("test_task");
+        auto task_handle = TEST_TRY(executor.create_task(
+                task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1), std::nullopt));
+
+        auto begin_phase = std::make_shared<PhaseController>(1);
+        auto verification_complete = std::make_shared<PhaseController>(1);
+
+        // 添加两个初始任务
+        auto driver1 = std::make_shared<TestingSplitRunner>(
+                "driver1", ticker,
+                std::make_shared<PhaseController>(0), // global_controller
+                begin_phase, verification_complete, 10, 0);
+        auto driver2 = std::make_shared<TestingSplitRunner>(
+                "driver2", ticker,
+                std::make_shared<PhaseController>(0), // global_controller
+                begin_phase, verification_complete, 10, 0);
+        executor.enqueue_splits(task_handle, true, {driver1, driver2});
+
+        // 验证初始状态
+        EXPECT_EQ(driver1->completed_phases(), 0);
+        EXPECT_EQ(driver2->completed_phases(), 0);
+
+        // 触发第一阶段
+        begin_phase->arrive_and_await();
+        ticker->increment(60, std::chrono::seconds(1));
+
+        // 验证停滞任务检测
+        //        auto stuck_splits = executor.get_stuck_split_task_ids(stuck_threshold, [](auto&){ return true; });
+        //        EXPECT_TRUE(stuck_splits.empty()) << "应无停滞任务";
+        //        EXPECT_EQ(executor.get_runaway_split_count(), 0) << "失控任务计数应为0";
+
+        // 触发任务超时
+        ticker->increment(600, std::chrono::seconds(1));
+        //        stuck_splits = executor.get_stuck_split_task_ids(stuck_threshold, [](auto&){ return true; });
+        //        EXPECT_EQ(stuck_splits.size(), 1) << "应检测到1个停滞任务";
+        //        EXPECT_EQ(*stuck_splits.begin(), task_id) << "停滞任务ID不匹配";
+        //        EXPECT_EQ(executor.get_runaway_split_count(), 2) << "应检测到2个失控任务";
+        verification_complete->arrive_and_await();
+
+        // advance one phase and verify
+        begin_phase->arrive_and_await();
+        EXPECT_EQ(driver1->completed_phases(), 1);
+        EXPECT_EQ(driver2->completed_phases(), 1);
+        verification_complete->arrive_and_await();
+
+        // add one more job
+        auto driver3 = std::make_shared<TestingSplitRunner>(
+                "driver3", ticker,
+                std::make_shared<PhaseController>(0), // global_controller
+                begin_phase, verification_complete, 10, 0);
+        executor.enqueue_splits(task_handle, false, {driver3});
+        // advance one phase and verify
+        begin_phase->arrive_and_await();
+        EXPECT_EQ(driver1->completed_phases(), 2);
+        EXPECT_EQ(driver2->completed_phases(), 2);
+        EXPECT_EQ(driver3->completed_phases(), 0);
+        //        future1.get(1, SECONDS);
+        //        future2.get(1, SECONDS);
+        verification_complete->arrive_and_await();
+
+        // advance to the end of the first two task and verify
+        begin_phase->arrive_and_await();
+        for (int i = 0; i < 7; i++) {
+            verification_complete->arrive_and_await();
+            begin_phase->arrive_and_await();
+            EXPECT_EQ(begin_phase->generation(), verification_complete->generation() + 1);
+        }
+        EXPECT_EQ(driver1->completed_phases(), 10);
+        EXPECT_EQ(driver2->completed_phases(), 10);
+        EXPECT_EQ(driver3->completed_phases(), 8);
+        //        future3.get(1, SECONDS);
+        verification_complete->arrive_and_await();
+
+        // advance two more times and verify
+        begin_phase->arrive_and_await();
+        verification_complete->arrive_and_await();
+        begin_phase->arrive_and_await();
+        EXPECT_EQ(driver1->completed_phases(), 10);
+        EXPECT_EQ(driver2->completed_phases(), 10);
+        EXPECT_EQ(driver3->completed_phases(), 10);
+        //        future3.get(1, SECONDS);
+        verification_complete->arrive_and_await();
+        //
+        EXPECT_EQ(driver1->first_phase(), 0);
+        EXPECT_EQ(driver2->first_phase(), 0);
+        EXPECT_EQ(driver3->first_phase(), 2);
+        EXPECT_EQ(driver1->last_phase(), 10);
+        EXPECT_EQ(driver2->last_phase(), 10);
+        EXPECT_EQ(driver3->last_phase(), 12);
+
+        // 清理后验证无滞留任务
+        ticker->increment(610, std::chrono::seconds(1));
+        //        stuck_splits = executor.get_stuck_split_task_ids(stuck_threshold, [](auto&){ return true; });
+        //        EXPECT_TRUE(stuck_splits.empty()) << "最终应无停滞任务";
+        //        EXPECT_EQ(executor.get_runaway_split_count(), 0) << "最终失控任务计数应为0";
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
+TEST_F(TimeSharingTaskExecutorTest, TestQuantaFairness) {
+    constexpr int TOTAL_PHASES = 11;
+    std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();
+
+    TimeSharingTaskExecutor executor(1, 2, 3, 4, ticker);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    ticker->increment(20, std::chrono::milliseconds(1));
+    // 创建两个独立的任务句柄
+    TaskId short_task_id("short_quanta");
+    TaskId long_task_id("long_quanta");
+    auto short_handle = TEST_TRY(executor.create_task(
+            short_task_id, []() { return 0.0; }, // 优先级函数
+            10,                                  // 初始并发数
+            std::chrono::milliseconds(1), std::nullopt));
+
+    auto long_handle = TEST_TRY(executor.create_task(
+            long_task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1), std::nullopt));
+
+    auto end_controller = std::make_shared<PhaseController>(0);
+
+    auto short_split = std::make_shared<TestingSplitRunner>(
+            "short", ticker, std::make_shared<PhaseController>(0),
+            std::make_shared<PhaseController>(0), end_controller, TOTAL_PHASES, 10);
+
+    auto long_split = std::make_shared<TestingSplitRunner>(
+            "long", ticker, std::make_shared<PhaseController>(0),
+            std::make_shared<PhaseController>(0), end_controller, TOTAL_PHASES, 20);
+
+    // 添加任务到对应的句柄
+    executor.enqueue_splits(short_handle, true, {short_split});
+    executor.enqueue_splits(long_handle, true, {long_split});
+
+    // 修改主线程同步逻辑
+    for (int i = 0; i < TOTAL_PHASES; ++i) {
+        end_controller->arrive_and_await();
+    }
+
+    //    fprintf(stderr, "验证阶段数\n");
+
+    // 验证阶段数
+    EXPECT_GE(short_split->completed_phases(), 7);
+    EXPECT_LE(short_split->completed_phases(), 8);
+    EXPECT_GE(long_split->completed_phases(), 3);
+    EXPECT_LE(long_split->completed_phases(), 4);
+    end_controller->arrive_and_deregister();
+    executor.stop();
+}
+
+TEST_F(TimeSharingTaskExecutorTest, TestLevelMovement) {
+    //    fprintf(stderr, "[%zu] TestLevelMovement\n",
+    //            std::hash<std::thread::id> {}(std::this_thread::get_id()));
+
+    std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();
+    TimeSharingTaskExecutor executor(2, 2, 3, 4, ticker);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        // 初始化时间
+        ticker->increment(20, std::chrono::milliseconds(1));
+
+        // 创建任务句柄
+        TaskId task_id("test_task");
+        auto task_handle =
+                std::static_pointer_cast<TimeSharingTaskHandle>(TEST_TRY(executor.create_task(
+                        task_id, []() { return 0.0; }, // 优先级函数
+                        10,                            // 初始并发数
+                        std::chrono::milliseconds(1), std::nullopt)));
+
+        // 同步控制器
+        auto global_controller = std::make_shared<PhaseController>(3); // 2个执行线程 + 主线程
+        const int quanta_time_ms = 500;
+        const int phases_per_second = 1000 / quanta_time_ms;
+        const int total_phases =
+                MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS
+                        [MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS.size() - 1] *
+                phases_per_second; // 假设最后一级阈值
+
+        //        fprintf(stderr, "total_phases: %d\n", total_phases);
+        // 创建测试任务
+        auto driver1 = std::make_shared<TestingSplitRunner>(
+                "driver1", ticker, global_controller, std::make_shared<PhaseController>(0),
+                std::make_shared<PhaseController>(0), total_phases, quanta_time_ms);
+        auto driver2 = std::make_shared<TestingSplitRunner>(
+                "driver2", ticker, global_controller, std::make_shared<PhaseController>(0),
+                std::make_shared<PhaseController>(0), total_phases, quanta_time_ms);
+
+        // 添加任务
+        executor.enqueue_splits(task_handle, false, {driver1, driver2});
+
+        int completed_phases = 0;
+        for (int i = 0; i < MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS.size() - 1; ++i) {
+            const int target_seconds = MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i + 1];
+            while ((completed_phases / phases_per_second) < target_seconds) {
+                //                fprintf(stderr, "completed_phases: %d, phases_per_second: %d, target_seconds: %d\n",
+                //                        completed_phases, phases_per_second, target_seconds);
+                //                fprintf(stderr,
+                //                        "[%zu] TestLevelMovement _global_controller.arrive_and_await()"
+                //                        " _global_controller.count(): %d, _global_controller.parties(): %d;\n",
+                //                        std::hash<std::thread::id> {}(std::this_thread::get_id()),
+                //                        global_controller->count(), global_controller->parties());
+                global_controller->arrive_and_await();
+                //                fprintf(stderr,
+                //                        "[%zu] TestLevelMovement _global_controller.arrive_and_await() finished;\n",
+                //                        std::hash<std::thread::id> {}(std::this_thread::get_id()));
+                ++completed_phases;
+            }
+
+            // 验证优先级级别
+            EXPECT_EQ(task_handle->priority().level(), i + 1);
+        }
+
+        global_controller->arrive_and_deregister();
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
+TEST_F(TimeSharingTaskExecutorTest, TestLevelMultipliers) {
+    constexpr int TASK_THREADS = 6;
+    constexpr int CONCURRENCY = 3;
+
+    std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();
+    std::shared_ptr<MultilevelSplitQueue> split_queue =
+            std::make_shared<MultilevelSplitQueue>(2); // 初始化多级队列
+    TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 3, 4, ticker,
+                                     std::chrono::milliseconds(60000), split_queue);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        ticker->increment(20, std::chrono::milliseconds(1));
+
+        for (int i = 0;
+             i < (sizeof(MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS) / sizeof(int) - 1); ++i) {
+            // 创建三个任务句柄
+            TaskId task_id1("test1"), task_id2("test2"), task_id3("test3");
+            auto task_handle1 = TEST_TRY(executor.create_task(
+                    task_id1, []() { return 0.0; }, 10, std::chrono::milliseconds(1),
+                    std::nullopt));
+            auto task_handle2 = TEST_TRY(executor.create_task(
+                    task_id2, []() { return 0.0; }, 10, std::chrono::milliseconds(1),
+                    std::nullopt));
+            auto task_handle3 = TEST_TRY(executor.create_task(
+                    task_id3, []() { return 0.0; }, 10, std::chrono::milliseconds(1),
+                    std::nullopt));
+
+            // 移动任务0到下一级别
+            auto task0_job = std::make_shared<TestingSplitRunner>(
+                    "task0", ticker, std::make_shared<PhaseController>(0),
+                    std::make_shared<PhaseController>(0), std::make_shared<PhaseController>(0), 1,
+                    MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i + 1] * 1000);
+            executor.enqueue_splits(task_handle1, false, {task0_job});
+
+            // 移动任务1和2到当前级别
+            auto task1_job = std::make_shared<TestingSplitRunner>(
+                    "task1", ticker, std::make_shared<PhaseController>(0),
+                    std::make_shared<PhaseController>(0), std::make_shared<PhaseController>(0), 1,
+                    MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i] * 1000);
+            auto task2_job = std::make_shared<TestingSplitRunner>(
+                    "task2", ticker, std::make_shared<PhaseController>(0),
+                    std::make_shared<PhaseController>(0), std::make_shared<PhaseController>(0), 1,
+                    MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i] * 1000);
+            executor.enqueue_splits(task_handle2, false, {task1_job});
+            executor.enqueue_splits(task_handle3, false, {task2_job});
+
+            // 等待任务完成
+            while (!task0_job->is_finished() || !task1_job->is_finished() ||
+                   !task2_job->is_finished()) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            }
+
+            //            fprintf(stderr, "start new task\n");
+            // 启动新任务
+            auto global_controller = std::make_shared<PhaseController>(7); // 6个执行线程 + 主线程
+            const int phases_for_next_level = MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i + 1] -
+                                              MultilevelSplitQueue::LEVEL_THRESHOLD_SECONDS[i];
+            std::array<std::shared_ptr<TestingSplitRunner>, 6> drivers;
+
+            for (int j = 0; j < 6; ++j) {
+                drivers[j] = std::make_shared<TestingSplitRunner>(
+                        "driver" + std::to_string(j), ticker, global_controller,
+                        std::make_shared<PhaseController>(0), std::make_shared<PhaseController>(0),
+                        phases_for_next_level, 1000);
+            }
+
+            executor.enqueue_splits(task_handle1, false, {drivers[0], drivers[1]});
+            executor.enqueue_splits(task_handle2, false, {drivers[2], drivers[3]});
+            executor.enqueue_splits(task_handle3, false, {drivers[4], drivers[5]});
+
+            // 收集初始阶段数
+            int lower_level_start = 0, higher_level_start = 0;
+            for (int j = 2; j < 6; ++j) lower_level_start += drivers[j]->completed_phases();
+            for (int j = 0; j < 2; ++j) higher_level_start += drivers[j]->completed_phases();
+
+            // 运行所有任务
+            while (std::any_of(drivers.begin(), drivers.end(),
+                               [](auto& d) { return !d->is_finished(); })) {
+                global_controller->arrive_and_await();
+
+                int lower_level_end = 0, higher_level_end = 0;
+                for (int j = 2; j < 6; ++j) lower_level_end += drivers[j]->completed_phases();
+                for (int j = 0; j < 2; ++j) higher_level_end += drivers[j]->completed_phases();
+
+                int lower_level_time = lower_level_end - lower_level_start;
+                int higher_level_time = higher_level_end - higher_level_start;
+
+                if (higher_level_time > 20) {
+                    EXPECT_GT(lower_level_time, higher_level_time * 2 - 10);
+                    EXPECT_LT(higher_level_time, lower_level_time * 2 + 10);
+                }
+            }
+
+            global_controller->arrive_and_deregister();
+            static_cast<void>(executor.remove_task(task_handle1));
+            static_cast<void>(executor.remove_task(task_handle2));
+            static_cast<void>(executor.remove_task(task_handle3));
+        }
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
+TEST_F(TimeSharingTaskExecutorTest, TestTaskHandle) {
+    std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();
+    TimeSharingTaskExecutor executor(4, 8, 3, 4, ticker);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        // 创建任务句柄
+        TaskId task_id("test_task");
+        auto task_handle =
+                std::static_pointer_cast<TimeSharingTaskHandle>(TEST_TRY(executor.create_task(
+                        task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1),
+                        std::nullopt)));
+
+        // 准备同步控制器
+        auto begin_phase = std::make_shared<PhaseController>(1);
+        auto verification_complete = std::make_shared<PhaseController>(1);
+
+        // 创建测试任务
+        auto driver1 = std::make_shared<TestingSplitRunner>(
+                "driver1", ticker,
+                std::make_shared<PhaseController>(0), // global_controller
+                begin_phase, verification_complete,
+                10, // required_phases
+                0); // quanta_time
+
+        auto driver2 = std::make_shared<TestingSplitRunner>(
+                "driver2", ticker,
+                std::make_shared<PhaseController>(0), // global_controller
+                begin_phase, verification_complete,
+                10, // required_phases
+                0); // quanta_time
+
+        //        // 强制入队split(不立即执行)
+        //        executor.enqueue_splits(task_handle, true, {driver1});
+        //        EXPECT_EQ(task_handle->get_running_leaf_splits(), 0)
+        //                << "强制入队后应立即运行的任务数应为0";
+        //
+        //        // 正常入队split
+        //        executor.enqueue_splits(task_handle, false, {driver2});
+        //        EXPECT_EQ(task_handle->get_running_leaf_splits(), 1)
+        //                << "正常入队后应立即运行的任务数应为1";
+
+        //        // 正常入队split
+        //        executor.enqueue_splits(task_handle, false, {driver2});
+        //        EXPECT_EQ(task_handle->get_running_leaf_splits(), 2)
+        //                << "正常入队后应立即运行的任务数应为2";
+
+        executor.enqueue_splits(task_handle, true, {driver1});
+        EXPECT_EQ(task_handle->running_leaf_splits(), 0) << "强制入队后应立即运行的任务数应为0";
+
+        // 正常入队split
+        executor.enqueue_splits(task_handle, false, {driver2});
+        EXPECT_EQ(task_handle->running_leaf_splits(), 1) << "正常入队后应立即运行的任务数应为1";
+
+        // 释放同步锁让任务继续执行
+        begin_phase->arrive_and_deregister();
+        verification_complete->arrive_and_deregister();
+
+        // 等待任务完成
+        while (!driver1->is_finished() || !driver2->is_finished()) {
+            std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        }
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
+// 辅助函数实现(与之前测试保持一致)
+void wait_until_splits_start(const std::vector<std::shared_ptr<TestingSplitRunner>>& splits) {
+    constexpr auto TIMEOUT = std::chrono::seconds(30);
+    auto start = std::chrono::steady_clock::now();
+
+    while (std::any_of(splits.begin(), splits.end(),
+                       [](const auto& split) { return !split->is_started(); })) {
+        if (std::chrono::steady_clock::now() - start > TIMEOUT) {
+            throw std::runtime_error("等待split启动超时");
+        }
+        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+}
+
+TEST_F(TimeSharingTaskExecutorTest, TestMinMaxDriversPerTask) {
+    constexpr int MAX_DRIVERS_PER_TASK = 2;
+    constexpr int TASK_THREADS = 4;
+    constexpr int CONCURRENCY = 16;
+    constexpr int BATCH_COUNT = 4;
+    constexpr int SPLITS_PER_BATCH = 2;
+
+    std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();
+    std::shared_ptr<MultilevelSplitQueue> split_queue =
+            std::make_shared<MultilevelSplitQueue>(2); // 初始化多级队列
+    TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 1, MAX_DRIVERS_PER_TASK, ticker,
+                                     std::chrono::milliseconds(60000), split_queue);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        TaskId task_id("test_task");
+        auto task_handle = TEST_TRY(executor.create_task(
+                task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1), std::nullopt));
+
+        std::array<std::shared_ptr<TestingSplitRunner>, BATCH_COUNT * SPLITS_PER_BATCH> splits;
+        std::array<std::shared_ptr<PhaseController>, BATCH_COUNT> batch_controllers;
+
+        // 准备4个批次的split,每批2个
+        for (int batch = 0; batch < BATCH_COUNT; ++batch) {
+            batch_controllers[batch] = std::make_shared<PhaseController>(0);
+            batch_controllers[batch]->register_party();
+            auto& controller = batch_controllers[batch];
+
+            for (int i = 0; i < SPLITS_PER_BATCH; ++i) {
+                splits[batch * SPLITS_PER_BATCH + i] = std::make_shared<TestingSplitRunner>(
+                        "batch" + std::to_string(batch) + "_split" + std::to_string(i), ticker,
+                        std::make_shared<PhaseController>(0), // global_controller
+                        std::make_shared<PhaseController>(0), // begin_controller
+                        controller,                           // end_controller
+                        1,                                    // required_phases
+                        0);                                   // quanta_time
+            }
+
+            executor.enqueue_splits(
+                    task_handle, false,
+                    {splits[batch * SPLITS_PER_BATCH], splits[batch * SPLITS_PER_BATCH + 1]});
+        }
+
+        //        fprintf(stderr, "wait and validation.\n");
+        // 验证分批执行逻辑
+        for (int current_batch = 0; current_batch < BATCH_COUNT; ++current_batch) {
+            // 等待当前批次split启动
+            //            fprintf(stderr, "wait_until_splits_start batch: %d\n", current_batch);
+            wait_until_splits_start({splits[current_batch * SPLITS_PER_BATCH],
+                                     splits[current_batch * SPLITS_PER_BATCH + 1]});
+            //            fprintf(stderr, "wait_until_splits_start batch finished: %d\n", current_batch);
+
+            // 替换原有的验证逻辑
+            assert_split_states(current_batch * SPLITS_PER_BATCH + 1, splits);
+
+            // 完成当前批次
+            batch_controllers[current_batch]->arrive_and_deregister();
+        }
+
+        // 验证所有split最终完成
+        for (auto& split : splits) {
+            while (!split->is_finished()) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            }
+        }
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
+TEST_F(TimeSharingTaskExecutorTest, TestUserSpecifiedMaxDriversPerTask) {
+    constexpr int TASK_THREADS = 4;
+    constexpr int CONCURRENCY = 16;
+    constexpr int BATCH_COUNT = 4;
+
+    std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();
+    std::shared_ptr<MultilevelSplitQueue> split_queue = std::make_shared<MultilevelSplitQueue>(2);
+    TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 2, 4, ticker,
+                                     std::chrono::milliseconds(60000), split_queue);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        // 创建任务并覆盖最大驱动器数为1
+        TaskId task_id("user_specified_task");
+        auto task_handle = TEST_TRY(executor.create_task(
+                task_id, []() { return 0.0; }, 10, std::chrono::milliseconds(1),
+                std::optional<int>(1))); // 显式指定最大驱动器数
+
+        std::array<std::shared_ptr<TestingSplitRunner>, BATCH_COUNT> splits;
+        std::array<std::shared_ptr<PhaseController>, BATCH_COUNT> batch_controllers;
+
+        // 准备4个批次的split
+        for (int batch = 0; batch < BATCH_COUNT; ++batch) {
+            batch_controllers[batch] = std::make_shared<PhaseController>(0);
+            batch_controllers[batch]->register_party();
+
+            splits[batch] = std::make_shared<TestingSplitRunner>(
+                    "batch" + std::to_string(batch), ticker,
+                    std::make_shared<PhaseController>(0), // global_controller
+                    std::make_shared<PhaseController>(0), // begin_controller
+                    batch_controllers[batch],             // end_controller
+                    1,                                    // required_phases
+                    0);                                   // quanta_time
+
+            executor.enqueue_splits(task_handle, false, {splits[batch]});
+        }
+
+        // 验证分批执行逻辑
+        for (int current_batch = 0; current_batch < BATCH_COUNT; ++current_batch) {
+            // 等待当前批次split启动
+            wait_until_splits_start({splits[current_batch]});
+
+            // 验证当前批次状态
+            assert_split_states(current_batch, splits);
+
+            // 完成当前批次
+            batch_controllers[current_batch]->arrive_and_deregister();
+        }
+
+        // 验证所有split最终完成
+        for (auto& split : splits) {
+            while (!split->is_finished()) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            }
+        }
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
+TEST_F(TimeSharingTaskExecutorTest, TestMinDriversPerTaskWhenTargetConcurrencyIncreases) {
+    constexpr int TASK_THREADS = 4;
+    constexpr int CONCURRENCY = 1;
+    constexpr int BATCH_COUNT = 3;
+
+    std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();
+    std::shared_ptr<MultilevelSplitQueue> split_queue = std::make_shared<MultilevelSplitQueue>(2);
+    TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 2, 2, ticker,
+                                     std::chrono::milliseconds(60000), split_queue);
+    ASSERT_TRUE(executor.init().ok());
+    ASSERT_TRUE(executor.start().ok());
+
+    try {
+        // 创建任务并指定最小并发数
+        TaskId task_id("test_query");
+        auto task_handle = TEST_TRY(executor.create_task(
+                task_id, []() { return 0.0; }, // 确保缓冲区未充分利用
+                1,                             // 初始并发数
+                std::chrono::milliseconds(1),
+                std::optional<int>(2))); // 显式指定最小并发数
+
+        std::array<std::shared_ptr<TestingSplitRunner>, BATCH_COUNT> splits;
+        std::array<std::shared_ptr<PhaseController>, BATCH_COUNT> batch_controllers;
+
+        // 创建3个split
+        for (int batch = 0; batch < BATCH_COUNT; ++batch) {
+            batch_controllers[batch] = std::make_shared<PhaseController>(0);
+            batch_controllers[batch]->register_party();
+
+            splits[batch] = std::make_shared<TestingSplitRunner>(
+                    "split" + std::to_string(batch), ticker,
+                    std::make_shared<PhaseController>(0), // global_controller
+                    std::make_shared<PhaseController>(0), // begin_controller
+                    batch_controllers[batch],             // end_controller
+                    1,                                    // required_phases
+                    0);                                   // quanta_time
+        }
+
+        // 批量添加所有split
+        executor.enqueue_splits(task_handle, false, {splits[0], splits[1], splits[2]});
+
+        // 等待第一个split启动
+        wait_until_splits_start({splits[0]});
+
+        // 验证只有第一个split启动
+        assert_split_states(0, splits);
+
+        // 完成第一个split(应触发并发数调整)
+        batch_controllers[0]->arrive_and_deregister();
+
+        // 等待剩余split启动
+        wait_until_splits_start({splits[1], splits[2]});
+
+        // 验证所有split已启动
+        assert_split_states(2, splits);
+
+        // 完成剩余split
+        for (int batch = 1; batch < BATCH_COUNT; ++batch) {
+            batch_controllers[batch]->arrive_and_deregister();
+        }
+
+        // 验证所有split完成
+        for (auto& split : splits) {
+            while (!split->is_finished()) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(10));
+            }
+        }
+    } catch (...) {
+        executor.stop();
+        throw;
+    }
+    executor.stop();
+}
+
+TEST_F(TimeSharingTaskExecutorTest, TestLeafSplitsSize) {}
+
+//TEST_F(TimeSharingTaskExecutorTest, TaskRotation) {
+//    constexpr int TASK_THREADS = 4;
+//    constexpr int CONCURRENCY = 1;
+//    constexpr int BATCH_COUNT = 3;
+//
+//    std::shared_ptr<TestingTicker> ticker = std::make_shared<TestingTicker>();
+//    std::shared_ptr<MultilevelSplitQueue> split_queue = std::make_shared<MultilevelSplitQueue>(2);
+//
+//    TimeSharingTaskExecutor executor(TASK_THREADS, CONCURRENCY, 2, 2, ticker,
+//                                     std::chrono::milliseconds(60000), split_queue);
+//    ASSERT_TRUE(executor.init().ok());
+//    ASSERT_TRUE(executor.start().ok());
+//    // 创建三个任务
+//    auto task1 = std::make_shared<TimeSharingTaskHandle>(
+//            "task1", executor->_waiting_splits, []() { return 0.5; }, 1,
+//            std::chrono::milliseconds(100), 2);
+//    auto task2 = std::make_shared<TimeSharingTaskHandle>(
+//            "task2", executor->_waiting_splits, []() { return 0.5; }, 1,
+//            std::chrono::milliseconds(100), 2);
+//    auto task3 = std::make_shared<TimeSharingTaskHandle>(
+//            "task3", executor->_waiting_splits, []() { return 0.5; }, 1,
+//            std::chrono::milliseconds(100), 2);
+//
+//    // 添加任务到执行器
+//    executor->add_task("task1", task1);
+//    executor->add_task("task2", task2);
+//    executor->add_task("task3", task3);
+//
+//    // 添加分片到任务
+//    auto split1 = std::make_shared<MockSplitRunner>();
+//    auto split2 = std::make_shared<MockSplitRunner>();
+//    auto split3 = std::make_shared<MockSplitRunner>();
+//    task1->enqueue_split(split1);
+//    task2->enqueue_split(split2);
+//    task3->enqueue_split(split3);
+//
+//    // 测试任务轮转
+//    {
+//        std::lock_guard<std::mutex> guard(executor->_mutex);
+//
+//        // 第一次轮询,应该返回 task1 的分片
+//        auto result1 = executor->_poll_next_split_worker(guard);
+//        EXPECT_EQ(result1, split1);
+//
+//        // 第二次轮询,应该返回 task2 的分片
+//        auto result2 = executor->_poll_next_split_worker(guard);
+//        EXPECT_EQ(result2, split2);
+//
+//        // 第三次轮询,应该返回 task3 的分片
+//        auto result3 = executor->_poll_next_split_worker(guard);
+//        EXPECT_EQ(result3, split3);
+//
+//        // 第四次轮询,应该再次返回 task1 的分片(轮转)
+//        auto result4 = executor->_poll_next_split_worker(guard);
+//        EXPECT_EQ(result4, split1);
+//    }
+//
+//    // 测试任务轮转时跳过没有分片的任务
+//    {
+//        std::lock_guard<std::mutex> guard(executor->_mutex);
+//
+//        // 清空 task2 的分片
+//        task2->set_running_leaf_splits(2);
+//
+//        // 第一次轮询,应该返回 task3 的分片(跳过 task2)
+//        auto result1 = executor->_poll_next_split_worker(guard);
+//        EXPECT_EQ(result1, split3);
+//
+//        // 第二次轮询,应该返回 task1 的分片(跳过 task2)
+//        auto result2 = executor->_poll_next_split_worker(guard);
+//        EXPECT_EQ(result2, split1);
+//    }
+//}
+
+class PhaseControllerTest : public testing::Test {
+protected:
+    const int THREAD_COUNT = 4;
+    std::shared_ptr<PhaseController> controller;
+
+    void SetUp() override { controller = std::make_shared<PhaseController>(THREAD_COUNT); }
+};
+
+// 基础同步测试
+TEST_F(PhaseControllerTest, BasicSynchronization) {
+    std::atomic<int> ready {0};
+    std::vector<std::thread> threads;
+    const int GENERATIONS = 3;
+
+    for (int g = 0; g < GENERATIONS; ++g) {
+        for (int i = 0; i < THREAD_COUNT; ++i) {
+            threads.emplace_back([&, g] {
+                // 同步线程启动
+                if (ready.fetch_add(1) == THREAD_COUNT - 1) {
+                    ready.store(0);
+                } else {
+                    while (ready.load() != 0)
+                        ;
+                }
+
+                int gen = controller->arrive_and_await();
+                EXPECT_EQ(gen, g + 1) << "代际计数错误";
+            });
+        }
+
+        for (auto& t : threads) t.join();
+        threads.clear();
+
+        EXPECT_EQ(controller->generation(), g + 1);
+        EXPECT_EQ(controller->count(), THREAD_COUNT) << "每代应重置计数器";
+    }
+}
+
+// 动态参与者管理测试
+TEST_F(PhaseControllerTest, DynamicRegistration) {
+    constexpr int NEW_PARTIES = 2;
+    std::atomic<int> completed {0};
+    std::vector<std::thread> threads;
+    std::mutex cv_mtx;
+    std::condition_variable cv;
+
+    // 修改初始参与方数(包含主线程)
+    controller = std::make_shared<PhaseController>(THREAD_COUNT + 1);
+
+    // 初始参与者 (4 threads + 主线程)
+    for (int i = 0; i < THREAD_COUNT; ++i) {
+        threads.emplace_back([&] {
+            // 第一阶段同步
+            controller->arrive_and_await();
+            //            fprintf(stderr, "[初始线程 %d] 第一阶段完成\n", idx);
+
+            // 第二阶段同步
+            controller->arrive_and_await();
+            //            fprintf(stderr, "[初始线程 %d] 第二阶段完成\n", idx);
+
+            completed++;
+        });
+    }
+
+    // 主线程第一阶段同步
+    //    fprintf(stderr, "主线程开始同步第一阶段\n");
+    int main_gen = controller->arrive_and_await();
+    fprintf(stderr, "第一阶段完成,当前代际=%d\n", main_gen);
+
+    // 新注册参与者 (2 threads)
+    std::vector<std::thread> new_threads;
+    for (int i = 0; i < NEW_PARTIES; ++i) {
+        new_threads.emplace_back([&] {
+            {
+                std::lock_guard<std::mutex> lk(cv_mtx);
+                controller->register_party();
+                //                fprintf(stderr, "[新线程 %d] 注册完成,当前参与方=%d\n", my_id,
+                //                        controller->parties());
+            }
+            cv.notify_one();
+
+            // 只参与第二阶段同步
+            controller->arrive_and_await();
+            completed++;
+            //            fprintf(stderr, "[新线程 %d] 完成\n", my_id);
+        });
+    }
+
+    // 等待新线程完成注册
+    {
+        std::unique_lock<std::mutex> lk(cv_mtx);
+        cv.wait(lk, [&] { return controller->parties() == THREAD_COUNT + 1 + NEW_PARTIES; });
+    }
+
+    // 主线程触发第二阶段同步
+    //    fprintf(stderr, "主线程开始同步第二阶段\n");
+    main_gen = controller->arrive_and_await();
+    //    fprintf(stderr, "第二阶段完成,当前代际=%d\n", main_gen);
+
+    // 等待所有线程完成
+    for (auto& t : threads) t.join();
+    for (auto& t : new_threads) t.join();
+
+    // 验证结果
+    //    fprintf(stderr, "总完成数=%d (预期=%d)\n", completed.load(), THREAD_COUNT + NEW_PARTIES);
+    //    fprintf(stderr, "当前参与方=%d (预期=%d)\n", controller->parties(),
+    //            THREAD_COUNT + 1 + NEW_PARTIES);
+
+    EXPECT_EQ(completed.load(), THREAD_COUNT + NEW_PARTIES);
+    EXPECT_EQ(controller->parties(), THREAD_COUNT + 1 + NEW_PARTIES);
+}
+
+// // 注销参与者测试
+// TEST_F(PhaseControllerTest, DeregistrationTest) {
+//     std::vector<std::thread> threads;
+
+//     // 第一个线程注销
+//     threads.emplace_back([&] {
+//         controller->arrive_and_deregister();
+//         EXPECT_EQ(controller->parties(), THREAD_COUNT - 1);
+//     });
+
+//     // 其他线程正常完成
+//     for (int i = 1; i < THREAD_COUNT; ++i) {
+//         threads.emplace_back([&] {
+//             controller->arrive_and_await();
+//         });
+//     }
+
+//     for (auto& t : threads) t.join();
+
+//     EXPECT_EQ(controller->parties(), THREAD_COUNT - 1);
+//     EXPECT_EQ(controller->count(), THREAD_COUNT - 1) << "注销后计数器应更新";
+// }
+
+// // 混合操作压力测试
+// TEST_F(PhaseControllerTest, ConcurrencyStressTest) {
+//     const int TOTAL_OPERATIONS = 1000;
+//     std::vector<std::thread> threads;
+//     std::atomic<int> ops {0};
+
+//     auto worker = [&] {
+//         std::random_device rd;
+//         std::mt19937 gen(rd());
+//         std::uniform_int_distribution<> dist(0, 2);
+
+//         while (ops.fetch_add(1) < TOTAL_OPERATIONS) {
+//             switch (dist(gen)) {
+//                 case 0:
+//                     controller->arrive_and_await();
+//                     break;
+//                 case 1:
+//                     controller->register_party();
+//                     break;
+//                 case 2:
+//                     if (controller->parties() > 0) {
+//                         controller->arrive_and_deregister();
+//                     }
+//                     break;
+//             }
+//         }
+//     };
+
+//     // 启动工作线程
+//     for (int i = 0; i < 4; ++i) {
+//         threads.emplace_back(worker);
+//     }
+
+//     // 验证最终状态一致性
+//     for (auto& t : threads) t.join();
+
+//     EXPECT_GE(controller->parties(), 0) << "参与者数不应为负";
+//     EXPECT_GE(controller->count(), 0) << "计数器不应为负";
+//     EXPECT_GE(controller->generation(), 0) << "代际计数不应回退";
+// }
+
+// // 边界条件测试
+// TEST_F(PhaseControllerTest, EdgeCases) {
+//     // 测试空控制器
+//     auto empty_controller = std::make_shared<PhaseController>(0);
+//     EXPECT_EQ(empty_controller->arrive_and_await(), 0);
+
+//     // 测试单参与者
+//     auto single_controller = std::make_shared<PhaseController>(1);
+//     std::thread t([&] {
+//         EXPECT_EQ(single_controller->arrive_and_await(), 1);
+//     });
+//     t.join();
+//     EXPECT_EQ(single_controller->generation(), 1);
+
+//     // 测试多次注销
+//     auto controller = std::make_shared<PhaseController>(2);
+//     controller->arrive_and_deregister();
+//     controller->arrive_and_deregister();
+//     EXPECT_EQ(controller->parties(), 0);
+// }
+
+} // namespace vectorized
+} // namespace doris