| /* |
| * Copyright 2024-present Alibaba Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "paimon/executor.h" |
| |
| #include <condition_variable> |
| #include <functional> |
| #include <mutex> |
| #include <queue> |
| #include <thread> |
| #include <utility> |
| #include <vector> |
| |
| namespace paimon { |
| |
| class DefaultExecutor : public Executor { |
| public: |
| explicit DefaultExecutor(uint32_t thread_count); |
| ~DefaultExecutor() override; |
| |
| void Add(std::function<void()> func) override; |
| |
| private: |
| void WorkerThread(); |
| |
| uint32_t thread_count_; |
| std::vector<std::thread> workers_; |
| std::queue<std::function<void()>> tasks_; |
| std::mutex queue_mutex_; |
| std::condition_variable condition_; |
| bool stop_ = false; |
| int32_t active_tasks_ = 0; |
| }; |
| |
| DefaultExecutor::DefaultExecutor(uint32_t thread_count) : thread_count_(thread_count) { |
| for (uint32_t i = 0; i < thread_count_; ++i) { |
| workers_.emplace_back(&DefaultExecutor::WorkerThread, this); |
| } |
| } |
| |
| DefaultExecutor::~DefaultExecutor() { |
| { |
| std::unique_lock<std::mutex> lock(queue_mutex_); |
| stop_ = true; |
| condition_.notify_all(); |
| } |
| for (std::thread& worker : workers_) { |
| worker.join(); |
| } |
| } |
| |
| void DefaultExecutor::Add(std::function<void()> func) { |
| if (!func) { |
| return; |
| } |
| { |
| std::unique_lock<std::mutex> lock(queue_mutex_); |
| if (stop_) { |
| return; |
| } |
| tasks_.emplace(std::move(func)); |
| } |
| condition_.notify_one(); |
| } |
| |
| void DefaultExecutor::WorkerThread() { |
| while (true) { |
| std::function<void()> task; |
| { |
| std::unique_lock<std::mutex> lock(queue_mutex_); |
| condition_.wait(lock, [this] { return stop_ || !tasks_.empty(); }); |
| if (stop_ && tasks_.empty() && active_tasks_ == 0) { |
| condition_.notify_all(); |
| return; |
| } |
| if (!tasks_.empty()) { |
| task = std::move(tasks_.front()); |
| tasks_.pop(); |
| ++active_tasks_; |
| } |
| } |
| if (task) { |
| task(); |
| std::unique_lock<std::mutex> lock(queue_mutex_); |
| --active_tasks_; |
| if (tasks_.empty() && active_tasks_ == 0) { |
| condition_.notify_all(); |
| } |
| } |
| } |
| } |
| |
| PAIMON_EXPORT std::shared_ptr<Executor> GetGlobalDefaultExecutor() { |
| static uint32_t all_cores = std::thread::hardware_concurrency(); |
| static std::shared_ptr<Executor> internal = |
| std::make_shared<DefaultExecutor>(/*thread_count=*/all_cores); |
| return internal; |
| } |
| |
| PAIMON_EXPORT std::unique_ptr<Executor> CreateDefaultExecutor() { |
| return CreateDefaultExecutor(DEFAULT_EXECUTOR_THREAD_COUNT); |
| } |
| |
| PAIMON_EXPORT std::unique_ptr<Executor> CreateDefaultExecutor(uint32_t thread_count) { |
| return std::make_unique<DefaultExecutor>(thread_count); |
| } |
| |
| } // namespace paimon |