| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // bthread - An M:N threading library to make applications more concurrent. |
| |
| // Date: Tue Jul 10 17:40:58 CST 2012 |
| |
| #ifndef BTHREAD_TASK_CONTROL_H |
| #define BTHREAD_TASK_CONTROL_H |
| |
| #ifndef NDEBUG |
| #include <iostream> // std::ostream |
| #endif |
| #include <signal.h> |
| #include <stddef.h> // size_t |
| #include <vector> |
| #include <array> |
| #include <memory> |
| #include "butil/atomicops.h" // butil::atomic |
| #include "bvar/bvar.h" // bvar::PassiveStatus |
| #include "bthread/task_tracer.h" |
| #include "bthread/task_meta.h" // TaskMeta |
| #include "bthread/work_stealing_queue.h" // WorkStealingQueue |
| #include "bthread/parking_lot.h" |
| |
| DECLARE_int32(task_group_ntags); |
| namespace bthread { |
| |
| class TaskGroup; |
| |
| // Control all task groups |
| class TaskControl { |
| friend class TaskGroup; |
| friend void wait_for_butex(void*); |
| #ifdef BRPC_BTHREAD_TRACER |
| friend bthread_t init_for_pthread_stack_trace(); |
| #endif // BRPC_BTHREAD_TRACER |
| |
| public: |
| TaskControl(); |
| ~TaskControl(); |
| |
| // Must be called before using. `nconcurrency' is # of worker pthreads. |
| int init(int nconcurrency); |
| |
| // Create a TaskGroup in this control. |
| TaskGroup* create_group(bthread_tag_t tag); |
| |
| // Steal a task from a "random" group. |
| bool steal_task(bthread_t* tid, size_t* seed, size_t offset); |
| |
| // Tell other groups that `n' tasks was just added to caller's runqueue |
| void signal_task(int num_task, bthread_tag_t tag); |
| |
| // Stop and join worker threads in TaskControl. |
| void stop_and_join(); |
| |
| // Get # of worker threads. |
| int concurrency() const |
| { return _concurrency.load(butil::memory_order_acquire); } |
| |
| int concurrency(bthread_tag_t tag) const |
| { return _tagged_ngroup[tag].load(butil::memory_order_acquire); } |
| |
| void print_rq_sizes(std::ostream& os); |
| |
| double get_cumulated_worker_time(); |
| double get_cumulated_worker_time(bthread_tag_t tag); |
| int64_t get_cumulated_switch_count(); |
| int64_t get_cumulated_signal_count(); |
| |
| // [Not thread safe] Add more worker threads. |
| // Return the number of workers actually added, which may be less than |num| |
| int add_workers(int num, bthread_tag_t tag); |
| |
| // Choose one TaskGroup (randomly right now). |
| // If this method is called after init(), it never returns NULL. |
| TaskGroup* choose_one_group(bthread_tag_t tag); |
| |
| static int parse_cpuset(std::string value, std::vector<unsigned>& cpus); |
| |
| static void bind_thread_to_cpu(pthread_t pthread, unsigned cpu_id); |
| |
| #ifdef BRPC_BTHREAD_TRACER |
| // A stacktrace of bthread can be helpful in debugging. |
| void stack_trace(std::ostream& os, bthread_t tid); |
| std::string stack_trace(bthread_t tid); |
| #endif // BRPC_BTHREAD_TRACER |
| |
| void push_priority_queue(bthread_tag_t tag, bthread_t tid) { |
| _priority_queues[tag].push(tid); |
| } |
| |
| std::vector<bthread_t> get_living_bthreads(); |
| private: |
| typedef std::array<TaskGroup*, BTHREAD_MAX_CONCURRENCY> TaggedGroups; |
| typedef std::array<ParkingLot, BTHREAD_MAX_PARKINGLOT> TaggedParkingLot; |
| // Add/Remove a TaskGroup. |
| // Returns 0 on success, -1 otherwise. |
| int _add_group(TaskGroup*, bthread_tag_t tag); |
| int _destroy_group(TaskGroup*); |
| |
| // Tag group |
| TaggedGroups& tag_group(bthread_tag_t tag) { return _tagged_groups[tag]; } |
| |
| // Tag ngroup |
| butil::atomic<size_t>& tag_ngroup(int tag) { return _tagged_ngroup[tag]; } |
| |
| // Tag parking slot |
| TaggedParkingLot& tag_pl(bthread_tag_t tag) { return _tagged_pl[tag]; } |
| |
| static void delete_task_group(void* arg); |
| |
| static void* worker_thread(void* task_control); |
| |
| template <typename F> |
| void for_each_task_group(F const& f); |
| |
| bvar::LatencyRecorder& exposed_pending_time(); |
| bvar::LatencyRecorder* create_exposed_pending_time(); |
| bvar::Adder<int64_t>& tag_nworkers(bthread_tag_t tag); |
| bvar::Adder<int64_t>& tag_nbthreads(bthread_tag_t tag); |
| |
| std::vector<butil::atomic<size_t>> _tagged_ngroup; |
| std::vector<TaggedGroups> _tagged_groups; |
| butil::Mutex _modify_group_mutex; |
| |
| butil::atomic<bool> _init; // if not init, bvar will case coredump |
| bool _stop; |
| butil::atomic<int> _concurrency; |
| std::vector<pthread_t> _workers; |
| std::vector<unsigned> _cpus; |
| butil::atomic<int> _next_worker_id; |
| |
| bvar::Adder<int64_t> _nworkers; |
| butil::Mutex _pending_time_mutex; |
| butil::atomic<bvar::LatencyRecorder*> _pending_time; |
| bvar::PassiveStatus<double> _cumulated_worker_time; |
| bvar::PerSecond<bvar::PassiveStatus<double> > _worker_usage_second; |
| bvar::PassiveStatus<int64_t> _cumulated_switch_count; |
| bvar::PerSecond<bvar::PassiveStatus<int64_t> > _switch_per_second; |
| bvar::PassiveStatus<int64_t> _cumulated_signal_count; |
| bvar::PerSecond<bvar::PassiveStatus<int64_t> > _signal_per_second; |
| bvar::PassiveStatus<std::string> _status; |
| bvar::Adder<int64_t> _nbthreads; |
| |
| std::vector<bvar::Adder<int64_t>*> _tagged_nworkers; |
| std::vector<bvar::PassiveStatus<double>*> _tagged_cumulated_worker_time; |
| std::vector<bvar::PerSecond<bvar::PassiveStatus<double>>*> _tagged_worker_usage_second; |
| std::vector<bvar::Adder<int64_t>*> _tagged_nbthreads; |
| |
| bool _enable_priority_queue; |
| std::vector<WorkStealingQueue<bthread_t>> _priority_queues; |
| |
| size_t _pl_num_of_each_tag; |
| std::vector<TaggedParkingLot> _tagged_pl; |
| |
| #ifdef BRPC_BTHREAD_TRACER |
| TaskTracer _task_tracer; |
| #endif // BRPC_BTHREAD_TRACER |
| |
| }; |
| |
| inline bvar::LatencyRecorder& TaskControl::exposed_pending_time() { |
| bvar::LatencyRecorder* pt = _pending_time.load(butil::memory_order_consume); |
| if (!pt) { |
| pt = create_exposed_pending_time(); |
| } |
| return *pt; |
| } |
| |
| inline bvar::Adder<int64_t>& TaskControl::tag_nworkers(bthread_tag_t tag) { |
| return *_tagged_nworkers[tag]; |
| } |
| |
| inline bvar::Adder<int64_t>& TaskControl::tag_nbthreads(bthread_tag_t tag) { |
| return *_tagged_nbthreads[tag]; |
| } |
| |
| template <typename F> |
| inline void TaskControl::for_each_task_group(F const& f) { |
| if (_init.load(butil::memory_order_acquire) == false) { |
| return; |
| } |
| for (size_t i = 0; i < _tagged_groups.size(); ++i) { |
| auto ngroup = tag_ngroup(i).load(butil::memory_order_relaxed); |
| auto& groups = tag_group(i); |
| for (size_t j = 0; j < ngroup; ++j) { |
| f(groups[j]); |
| } |
| } |
| } |
| |
| } // namespace bthread |
| |
| #endif // BTHREAD_TASK_CONTROL_H |