blob: 886ec6cc426d8021d08a75ad6cd802d089fc0d19 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/thread.h
// and modified by Doris
#pragma once
#include <butil/macros.h>
#include <pthread.h>
#include <stdint.h>
#include <functional>
#include <string>
#include <utility>
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "util/countdown_latch.h"
namespace doris {
class WebPageHandler;
class Thread : public RefCountedThreadSafe<Thread> {
public:
enum CreateFlags { NO_FLAGS = 0, NO_STACK_WATCHDOG = 1 };
template <class F>
static Status create_with_flags(const std::string& category, const std::string& name,
const F& f, uint64_t flags, scoped_refptr<Thread>* holder) {
return start_thread(category, name, f, flags, holder);
}
template <class F>
static Status create(const std::string& category, const std::string& name, const F& f,
scoped_refptr<Thread>* holder) {
return start_thread(category, name, f, NO_FLAGS, holder);
}
template <class F, class A1>
static Status create(const std::string& category, const std::string& name, const F& f,
const A1& a1, scoped_refptr<Thread>* holder) {
return start_thread(category, name, std::bind(f, a1), NO_FLAGS, holder);
}
template <class F, class A1, class A2>
static Status create(const std::string& category, const std::string& name, const F& f,
const A1& a1, const A2& a2, scoped_refptr<Thread>* holder) {
return start_thread(category, name, std::bind(f, a1, a2), NO_FLAGS, holder);
}
template <class F, class A1, class A2, class A3>
static Status create(const std::string& category, const std::string& name, const F& f,
const A1& a1, const A2& a2, const A3& a3, scoped_refptr<Thread>* holder) {
return start_thread(category, name, std::bind(f, a1, a2, a3), NO_FLAGS, holder);
}
template <class F, class A1, class A2, class A3, class A4>
static Status create(const std::string& category, const std::string& name, const F& f,
const A1& a1, const A2& a2, const A3& a3, const A4& a4,
scoped_refptr<Thread>* holder) {
return start_thread(category, name, std::bind(f, a1, a2, a3, a4), NO_FLAGS, holder);
}
template <class F, class A1, class A2, class A3, class A4, class A5>
static Status create(const std::string& category, const std::string& name, const F& f,
const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
scoped_refptr<Thread>* holder) {
return start_thread(category, name, std::bind(f, a1, a2, a3, a4, a5), NO_FLAGS, holder);
}
template <class F, class A1, class A2, class A3, class A4, class A5, class A6>
static Status create(const std::string& category, const std::string& name, const F& f,
const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5,
const A6& a6, scoped_refptr<Thread>* holder) {
return start_thread(category, name, std::bind(f, a1, a2, a3, a4, a5, a6), NO_FLAGS, holder);
}
static void set_self_name(const std::string& name);
#ifndef __APPLE__
static void set_idle_sched();
static void set_thread_nice_value();
#endif
~Thread();
// Blocks until this thread finishes execution. Once this method returns, the thread
// will be unregistered with the ThreadMgr and will not appear in the debug UI.
void join();
// The thread ID assigned to this thread by the operating system. If the thread
// has not yet started running, returns INVALID_TID.
//
// NOTE: this may block for a short amount of time if the thread has just been
// started.
int64_t tid() const;
// Returns the thread's pthread ID.
pthread_t pthread_id() const;
const std::string& name() const;
const std::string& category() const;
std::string to_string() const;
// The current thread of execution, or nullptr if the current thread isn't a doris::Thread.
// This call is signal-safe.
static Thread* current_thread();
// Returns a unique, stable identifier for this thread. Note that this is a static
// method and thus can be used on any thread, including the main thread of the
// process.
//
// In general, this should be used when a value is required that is unique to
// a thread and must work on any thread including the main process thread.
//
// NOTE: this is _not_ the TID, but rather a unique value assigned by the
// thread implementation. So, this value should not be presented to the user
// in log messages, etc.
static int64_t unique_thread_id();
// Returns the system thread ID (tid on Linux) for the current thread. Note
// that this is a static method and thus can be used from any thread,
// including the main thread of the process. This is in contrast to
// Thread::tid(), which only works on doris::Threads.
//
// Thread::tid() will return the same value, but the value is cached in the
// Thread object, so will be faster to call.
//
// Thread::unique_thread_id() (or Thread::tid()) should be preferred for
// performance sensitive code, however it is only guaranteed to return a
// unique and stable thread ID, not necessarily the system thread ID.
static int64_t current_thread_id();
private:
friend class ThreadJoiner;
enum {
INVALID_TID = -1,
PARENT_WAITING_TID = -2,
};
// User function to be executed by this thread.
typedef std::function<void()> ThreadFunctor;
Thread(const std::string& category, const std::string& name, ThreadFunctor functor)
: _thread(0),
_tid(INVALID_TID),
_functor(std::move(functor)),
_category(std::move(category)),
_name(std::move(name)),
_done(1),
_joinable(false) {}
// Library-specific thread ID.
pthread_t _thread;
// OS-specific thread ID. Once the constructor finishes start_thread(),
// guaranteed to be set either to a non-negative integer, or to INVALID_TID.
//
// The tid_ member goes through the following states:
// 1. INVALID_TID: the thread has not been started, or has already exited.
// 2. PARENT_WAITING_TID: the parent has started the thread, but the
// thread has not yet begun running. Therefore the TID is not yet known
// but it will be set once the thread starts.
// 3. <positive value>: the thread is running.
int64_t _tid;
const ThreadFunctor _functor;
const std::string _category;
const std::string _name;
// Joiners wait on this latch to be notified if the thread is done.
//
// Note that Joiners must additionally pthread_join(), otherwise certain
// resources that callers expect to be destroyed (like TLS) may still be
// alive when a Joiner finishes.
CountDownLatch _done;
bool _joinable;
// Thread local pointer to the current thread of execution. Will be nullptr if the current
// thread is not a Thread.
static __thread Thread* _tls;
// Wait for the running thread to publish its tid.
int64_t wait_for_tid() const;
// Starts the thread running supervise_thread(), and returns once that thread has
// initialised and its TID has been read. Waits for notification from the started
// thread that initialisation is complete before returning. On success, stores a
// reference to the thread in holder.
static Status start_thread(const std::string& category, const std::string& name,
const ThreadFunctor& functor, uint64_t flags,
scoped_refptr<Thread>* holder);
// Wrapper for the user-supplied function. Invoked from the new thread,
// with the Thread as its only argument. Executes _functor, but before
// doing so registers with the global ThreadMgr and reads the thread's
// system ID. After _functor terminates, unregisters with the ThreadMgr.
// Always returns nullptr.
//
// supervise_thread() notifies start_thread() when thread initialisation is
// completed via the _tid, which is set to the new thread's system ID.
// By that point in time supervise_thread() has also taken a reference to
// the Thread object, allowing it to safely refer to it even after the
// caller drops its reference.
//
// Additionally, start_thread() notifies supervise_thread() when the actual
// Thread object has been assigned (supervise_thread() is spinning during
// this time). Without this, the new thread may reference the actual
// Thread object before it has been assigned by start_thread(). See
// KUDU-11 for more details.
static void* supervise_thread(void* arg);
// Invoked when the user-supplied function finishes or in the case of an
// abrupt exit (i.e. pthread_exit()). Cleans up after supervise_thread().
static void finish_thread(void* arg);
static void init_threadmgr();
};
// Utility to join on a thread, printing warning messages if it
// takes too long. For example:
//
// ThreadJoiner(&my_thread, "processing thread")
// .warn_after_ms(1000)
// .warn_every_ms(5000)
// .Join();
//
// TODO: would be nice to offer a way to use ptrace() or signals to
// dump the stack trace of the thread we're trying to join on if it
// gets stuck. But, after looking for 20 minutes or so, it seems
// pretty complicated to get right.
class ThreadJoiner {
public:
explicit ThreadJoiner(Thread* thread);
// Start emitting warnings after this many milliseconds.
//
// Default: 1000 ms.
ThreadJoiner& warn_after_ms(int ms);
// After the warnings after started, emit another warning at the
// given interval.
//
// Default: 1000 ms.
ThreadJoiner& warn_every_ms(int ms);
// If the thread has not stopped after this number of milliseconds, give up
// joining on it and return Status::Aborted.
//
// -1 (the default) means to wait forever trying to join.
ThreadJoiner& give_up_after_ms(int ms);
// Join the thread, subject to the above parameters. If the thread joining
// fails for any reason, returns RuntimeError. If it times out, returns
// Aborted.
Status join();
private:
enum {
kDefaultWarnAfterMs = 1000,
kDefaultWarnEveryMs = 1000,
kDefaultGiveUpAfterMs = -1 // forever
};
Thread* _thread = nullptr;
int _warn_after_ms;
int _warn_every_ms;
int _give_up_after_ms;
DISALLOW_COPY_AND_ASSIGN(ThreadJoiner);
};
// Registers /threadz with the debug webserver.
void register_thread_display_page(WebPageHandler* web_page_handler);
} //namespace doris