| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_UTIL_THREAD_POOL_H |
| #define IMPALA_UTIL_THREAD_POOL_H |
| |
| #include "util/blocking-queue.h" |
| |
| #include <boost/thread/mutex.hpp> |
| #include <boost/bind/mem_fn.hpp> |
| |
| #include "util/aligned-new.h" |
| #include "util/condition-variable.h" |
| #include "util/thread.h" |
| |
| namespace impala { |
| |
| /// Simple threadpool which processes items (of type T) in parallel which were placed on a |
| /// blocking queue by Offer(). Each item is processed by a single user-supplied method. |
| template <typename T> |
| class ThreadPool : public CacheLineAligned { |
| public: |
| /// Signature of a work-processing function. Takes the integer id of the thread which is |
| /// calling it (ids run from 0 to num_threads - 1) and a reference to the item to |
| /// process. |
| typedef boost::function<void (int thread_id, const T& workitem)> WorkFunction; |
| |
| /// Creates a new thread pool without starting any threads. Code must call |
| /// Init() on this thread pool before any calls to Offer(). |
| /// -- num_threads: how many threads are part of this pool |
| /// -- queue_size: the maximum size of the queue on which work items are offered. If the |
| /// queue exceeds this size, subsequent calls to Offer will block until there is |
| /// capacity available. |
| /// -- work_function: the function to run every time an item is consumed from the queue |
| /// -- fault_injection_eligible - If set to true, allow fault injection at this |
| /// callsite (see thread_creation_fault_injection). If set to false, fault |
| /// injection is diabled at this callsite. Thread creation sites that crash |
| /// Impala or abort startup must have this set to false. |
| ThreadPool(const std::string& group, const std::string& thread_prefix, |
| uint32_t num_threads, uint32_t queue_size, const WorkFunction& work_function, |
| bool fault_injection_eligible = false) |
| : group_(group), thread_prefix_(thread_prefix), num_threads_(num_threads), |
| work_function_(work_function), work_queue_(queue_size), |
| fault_injection_eligible_(fault_injection_eligible) {} |
| |
| /// Destructor ensures that all threads are terminated before this object is freed |
| /// (otherwise they may continue to run and reference member variables) |
| virtual ~ThreadPool() { |
| Shutdown(); |
| Join(); |
| } |
| |
| /// Create the threads needed for this ThreadPool. Returns an error on any |
| /// error spawning the threads. |
| Status Init() { |
| for (int i = 0; i < num_threads_; ++i) { |
| std::stringstream threadname; |
| threadname << thread_prefix_ << "(" << i + 1 << ":" << num_threads_ << ")"; |
| std::unique_ptr<Thread> t; |
| Status status = Thread::Create(group_, threadname.str(), |
| boost::bind<void>(boost::mem_fn(&ThreadPool<T>::WorkerThread), this, i), &t, |
| fault_injection_eligible_); |
| if (!status.ok()) { |
| // The thread pool initialization failed. Shutdown any threads that were |
| // spawned. Note: Shutdown() and Join() are safe to call multiple times. |
| Shutdown(); |
| Join(); |
| return status; |
| } |
| threads_.AddThread(std::move(t)); |
| } |
| initialized_ = true; |
| return Status::OK(); |
| } |
| |
| /// Blocking operation that puts a work item on the queue. If the queue is full, blocks |
| /// until there is capacity available. The ThreadPool must be initialized before |
| /// calling this method. |
| // |
| /// 'work' is copied into the work queue, but may be referenced at any time in the |
| /// future. Therefore the caller needs to ensure that any data referenced by work (if T |
| /// is, e.g., a pointer type) remains valid until work has been processed, and it's up to |
| /// the caller to provide their own signalling mechanism to detect this (or to wait until |
| /// after DrainAndShutdown returns). |
| // |
| /// Returns true if the work item was successfully added to the queue, false otherwise |
| /// (which typically means that the thread pool has already been shut down). |
| template <typename V> |
| bool Offer(V&& work) { |
| DCHECK(initialized_); |
| return work_queue_.BlockingPut(std::forward<V>(work)); |
| } |
| |
| /// Blocks until the work item is placed on the queue or the timeout expires. The |
| /// ThreadPool must be initialized before calling this method. The same requirements |
| /// about the lifetime of 'work' applies as in Offer() above. If the operation times |
| /// out, the work item can be safely freed. |
| /// |
| /// Returns true if the work item was successfully added to the queue, false otherwise |
| /// (which means the operation timed out or the thread pool has already been shut down). |
| template <typename V> |
| bool Offer(V&& work, int64_t timeout_millis) { |
| DCHECK(initialized_); |
| int64_t timeout_micros = timeout_millis * MICROS_PER_MILLI; |
| return work_queue_.BlockingPutWithTimeout(std::forward<V>(work), timeout_micros); |
| } |
| |
| /// Shuts the thread pool down, causing the work queue to cease accepting offered work |
| /// and the worker threads to terminate once they have processed their current work item. |
| /// Returns once the shutdown flag has been set, does not wait for the threads to |
| /// terminate. |
| void Shutdown() { |
| { |
| boost::lock_guard<boost::mutex> l(lock_); |
| shutdown_ = true; |
| } |
| work_queue_.Shutdown(); |
| } |
| |
| /// Blocks until all threads are finished. Shutdown does not need to have been called, |
| /// since it may be called on a separate thread. |
| void Join() { |
| threads_.JoinAll(); |
| } |
| |
| uint32_t GetQueueSize() const { |
| return work_queue_.Size(); |
| } |
| |
| /// Blocks until the work queue is empty, and then calls Shutdown to stop the worker |
| /// threads and Join to wait until they are finished. |
| /// Any work Offer()'ed during DrainAndShutdown may or may not be processed. |
| void DrainAndShutdown() { |
| { |
| boost::unique_lock<boost::mutex> l(lock_); |
| // If the ThreadPool is not initialized, then the queue must be empty. |
| DCHECK(initialized_ || work_queue_.Size() == 0); |
| while (work_queue_.Size() != 0) { |
| empty_cv_.Wait(l); |
| } |
| } |
| Shutdown(); |
| Join(); |
| } |
| |
| private: |
| /// Driver method for each thread in the pool. Continues to read work from the queue |
| /// until the pool is shutdown. |
| void WorkerThread(int thread_id) { |
| while (!IsShutdown()) { |
| T workitem; |
| if (work_queue_.BlockingGet(&workitem)) { |
| work_function_(thread_id, workitem); |
| } |
| if (work_queue_.Size() == 0) { |
| /// Take lock to ensure that DrainAndShutdown() cannot be between checking |
| /// GetSize() and wait()'ing when the condition variable is notified. |
| /// (It will hang if we notify right before calling wait().) |
| boost::unique_lock<boost::mutex> l(lock_); |
| empty_cv_.NotifyAll(); |
| } |
| } |
| } |
| |
| /// Returns value of shutdown_ under a lock, forcing visibility to threads in the pool. |
| bool IsShutdown() { |
| boost::lock_guard<boost::mutex> l(lock_); |
| return shutdown_; |
| } |
| |
| /// Group string to tag threads for this pool |
| const std::string group_; |
| |
| /// Thread name prefix |
| const std::string thread_prefix_; |
| |
| /// The number of threads to start in this pool |
| uint32_t num_threads_; |
| |
| /// User-supplied method to call to process each work item. |
| WorkFunction work_function_; |
| |
| /// Queue on which work items are held until a thread is available to process them in |
| /// FIFO order. |
| BlockingQueue<T> work_queue_; |
| |
| /// Whether this ThreadPool will tolerate failure by aborting a query. This means |
| /// it is safe to inject errors for Init(). |
| bool fault_injection_eligible_; |
| |
| /// Collection of worker threads that process work from the queue. |
| ThreadGroup threads_; |
| |
| /// Guards shutdown_ and empty_cv_ |
| boost::mutex lock_; |
| |
| /// Set to true when Init() has finished spawning the threads. |
| bool initialized_ = false; |
| |
| /// Set to true when threads should stop doing work and terminate. |
| bool shutdown_ = false; |
| |
| /// Signalled when the queue becomes empty |
| ConditionVariable empty_cv_; |
| }; |
| |
| /// Utility thread-pool that accepts callable work items, and simply invokes them. |
| class CallableThreadPool : public ThreadPool<boost::function<void()>> { |
| public: |
| CallableThreadPool(const std::string& group, const std::string& thread_prefix, |
| uint32_t num_threads, uint32_t queue_size) : |
| ThreadPool<boost::function<void()>>( |
| group, thread_prefix, num_threads, queue_size, &CallableThreadPool::Worker) { |
| } |
| |
| private: |
| static void Worker(int thread_id, const boost::function<void()>& f) { |
| f(); |
| } |
| }; |
| |
| /// Parent class for all synchronous work items |
| /// |
| /// Important note for all subclasses: |
| /// All fields need to have a lifetime that matches this operation's lifetime. |
| /// In particular, caller-provided arguments need to be moved, copied, or need to have a |
| /// lifetime independent of the caller. The monitored operation must survive the caller |
| /// timing out and potentially deallocating memory. |
| class SynchronousWorkItem { |
| public: |
| virtual ~SynchronousWorkItem() {} |
| |
| /// Customized implementation for each operation. Subclasses must override this. |
| /// The status is conveyed back to the original caller. |
| virtual Status Execute() { |
| DCHECK(false) << "Execute() must be implemented"; |
| return Status("Execute() must be implemented"); |
| } |
| |
| virtual std::string GetDescription() { |
| DCHECK(false) << "GetDescription() must be implemented"; |
| return ""; |
| } |
| |
| private: |
| friend class SynchronousThreadPool; |
| |
| /// This is called by the worker thread and handles the mechanics of notifying |
| /// the caller and conveying status when Execute() completes. |
| void WorkerExecute() { |
| Status status = Execute(); |
| DCHECK(!done_promise_.IsSet()); |
| discard_result(done_promise_.Set(status)); |
| } |
| |
| /// Wait for the operation to complete or time out with the specified limit |
| /// 'timeout_millis' given that the caller has already waited 'time_used_millis'. |
| /// If the operation times out, it returns TErrorCode::THREAD_POOL_TASK_TIMED_OUT. |
| /// Otherwise, it returns the status returned by Execute(). |
| Status Wait(int64_t timeout_millis, int64_t time_used_millis) { |
| // If the time used has already exceeded the timeout, go directly to returning |
| // THREAD_POOL_TASK_TIMED_OUT. |
| bool timed_out = (time_used_millis >= timeout_millis); |
| Status status; |
| if (!timed_out) { |
| int64_t timeout_left_millis = timeout_millis - time_used_millis; |
| status = done_promise_.Get(timeout_left_millis, &timed_out); |
| } |
| if (timed_out) { |
| // IMPALA-7946: Always throw an error using the original timeout, not the |
| // timeout remaining. |
| Status timeout_status = Status(TErrorCode::THREAD_POOL_TASK_TIMED_OUT, |
| GetDescription(), timeout_millis / MILLIS_PER_SEC); |
| LOG(WARNING) << timeout_status.GetDetail(); |
| return timeout_status; |
| } |
| return status; |
| } |
| |
| // Set to the return status of ExecuteImpl() upon completion |
| Promise<Status> done_promise_; |
| }; |
| |
| /// Synchronous thread pool can run any subclass of SynchronousWorkItem |
| /// Ownership is shared between the caller side and the worker side: |
| /// 1. The caller accesses the operation to wait for its completion (or timeout) and |
| /// retrieve any result. |
| /// 2. The ThreadPool worker calls Execute() on the operation and needs to maintain |
| /// ownership until the operation completes. The blocking queue inside the ThreadPool |
| /// also needs to maintain ownership until a thread removes the operation for |
| /// processing. |
| /// This is an awkward circumstance to have exclusive ownership. The caller can time |
| /// out and leave while the worker is processing. When the worker completes and could |
| /// release ownership, the caller might still need to retrieve the result or the caller |
| /// might be gone. shared_ptr does what we want: the HdfsMonitorOp will survive until |
| /// neither thread needs it anymore. |
| class SynchronousThreadPool : public ThreadPool<std::shared_ptr<SynchronousWorkItem>> { |
| public: |
| SynchronousThreadPool(const std::string& group, const std::string& thread_prefix, |
| uint32_t num_threads, uint32_t queue_size) : |
| ThreadPool<std::shared_ptr<SynchronousWorkItem>>(group, thread_prefix, num_threads, |
| queue_size, &SynchronousThreadPool::Worker) {} |
| |
| /// Run the provided work item and wait up to 'timeout_milliseconds' for the |
| /// operation to complete. If it completes, return the status from the work |
| /// item's Execute() function. Otherwise, return an error status: |
| /// - THREAD_POOL_TASK_TIMED_OUT if the individual task timed out |
| /// - THREAD_POOL_SUBMIT_FAILED if all the threads are busy and the task did not |
| /// even start |
| Status SynchronousOffer(std::shared_ptr<SynchronousWorkItem> work, |
| int64_t timeout_milliseconds) { |
| MonotonicStopWatch offer_timer; |
| offer_timer.Start(); |
| bool offer_success = Offer(work, timeout_milliseconds); |
| offer_timer.Stop(); |
| if (!offer_success) { |
| // This scenario only happens when all threads are occupied and the queue |
| // is full. This means the system is in a catastrophic state. Log to ERROR. |
| Status failed_to_submit_status = |
| Status(TErrorCode::THREAD_POOL_SUBMIT_FAILED, work->GetDescription(), |
| timeout_milliseconds / MILLIS_PER_SEC); |
| LOG(ERROR) << failed_to_submit_status.GetDetail(); |
| return failed_to_submit_status; |
| } |
| |
| int64_t time_used_millis = |
| offer_timer.ElapsedTime() / (NANOS_PER_MICRO * MICROS_PER_MILLI); |
| return work->Wait(timeout_milliseconds, time_used_millis); |
| } |
| |
| private: |
| static void Worker(int thread_id, const std::shared_ptr<SynchronousWorkItem>& work) { |
| work->WorkerExecute(); |
| } |
| }; |
| |
| } |
| |
| #endif |