blob: 0e3b472346ab33667684d87a9f245c10ada84691 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <pthread.h>
#include <atomic>
#include <condition_variable>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include "simple_sync_queue.h"
namespace doris::cloud {
class SimpleThreadPool {
private:
using JobType = std::function<void()>;
// a lock based sync queue
std::shared_ptr<SimpleSyncQueue<JobType>> _job_queue;
std::vector<std::thread> _worker_thread_group; // multi thread pool
std::atomic<bool> _is_running;
size_t _pool_size;
std::string _pool_name;
public:
SimpleThreadPool(size_t size, const std::string& name = "")
: _is_running(false), _pool_size(size), _pool_name(name) {
_job_queue = std::make_shared<SimpleSyncQueue<JobType>>(_pool_size * 2);
}
SimpleThreadPool() = delete;
/**
* Submits jobs, if job queue is full it will blocked
*
* @return 0 if succeeded
*/
int submit(JobType f) {
if (!_is_running) {
// not started
return -1;
}
_job_queue->emplace(std::move(f));
return 0;
}
/**
* Submits jobs with a limit blocking time
*
* @param timeout max blocking time in milliseconds
* @return 0 if success
* -1 if thread pool not start yet
* -2 if time out
*/
int submit_with_timeout(JobType f, int timeout) {
if (!_is_running) {
// not started
return -1;
}
if (!_job_queue->emplace_with_timeout(timeout, std::move(f))) {
return -2;
}
return 0;
}
/**
*
* @return always 0
*/
int start() {
_is_running = true;
_worker_thread_group.clear();
_pool_name = _pool_name.empty() ? "simple_thread_pool" : _pool_name;
for (size_t i = 0; i < _pool_size; ++i) {
_worker_thread_group.emplace_back(&SimpleThreadPool::work, this);
std::string name = _pool_name + "_" + std::to_string(i);
pthread_setname_np(_worker_thread_group.back().native_handle(), name.c_str());
}
return 0;
}
/**
* Stops to get jobs from job queue, the job being done will finish normally
*
* @return 0 if succeed, otherwise non-zero value returned
*/
int stop() {
if (!_is_running) {
// already stopped
return -1;
}
_is_running = false;
for (auto& i : _worker_thread_group) {
if (i.joinable()) {
i.join();
}
}
return 0;
}
/**
* Gets size of the pool
*
* @return the thread number(pool size) in the thread pool
*/
size_t size() const { return _pool_size; }
/**
* Terminates the thread pool immediately, jobs in the queue will not be done
* and the running threads will be detached
*
* @return 0 if succeed, otherwise non-zero value returned
*/
int terminate() {
if (!_is_running) {
// not running
return -1;
}
_is_running = false;
_job_queue->clear(); // abandon all jobs
for (auto& i : _worker_thread_group) {
i.detach();
}
return 0;
}
~SimpleThreadPool() {
if (_is_running) {
stop();
}
}
private:
/**
* Working thread
*/
void work() {
while (_is_running || !_job_queue->empty()) {
JobType job;
if (!_job_queue->get_with_timeout(&job, 10)) {
continue;
}
try {
job();
} catch (const std::exception& e) {
std::cerr << "exception happened when execute job. err: " << e.what() << std::endl;
} catch (...) {
std::cerr << "exception happened when execute job." << std::endl;
}
}
}
};
} // namespace doris::cloud