// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/thread.cc
// and modified by Doris

#include "thread.h"

#include <sys/resource.h>

#ifndef __APPLE__
// IWYU pragma: no_include <bits/types/struct_sched_param.h>
#include <sched.h>
#include <sys/prctl.h>
#else
#include <pthread.h>

#include <cstdint>
#endif

// IWYU pragma: no_include <bthread/errno.h>
#include <errno.h> // IWYU pragma: keep
#include <sys/syscall.h>
#include <time.h>
#include <unistd.h>

#include <algorithm>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <cstring>
#include <functional>
#include <limits>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <vector>

#include "absl/strings/substitute.h"
#include "common/config.h"
#include "common/logging.h"
#include "http/web_page_handler.h"
#include "runtime/thread_context.h"
#include "util/easy_json.h"
#include "util/os_util.h"
#include "util/url_coding.h"

namespace doris {

class ThreadMgr;

__thread Thread* Thread::_tls = nullptr;

// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread.
// // The Thread class adds a reference to thread_manager while it is supervising a thread so
// // that a race between the end of the process's main thread (and therefore the destruction
// // of thread_manager) and the end of a thread that tries to remove itself from the
// // manager after the destruction can be avoided.
static std::shared_ptr<ThreadMgr> thread_manager;
//
// Controls the single (lazy) initialization of thread_manager.
static std::once_flag once;

// A singleton class that tracks all live threads, and groups them together for easy
// auditing. Used only by Thread.
class ThreadMgr {
public:
    ThreadMgr() : _threads_started_metric(0), _threads_running_metric(0) {}

    ~ThreadMgr() {
        std::unique_lock<std::mutex> lock(_lock);
        _thread_categories.clear();
    }

    static void set_thread_name(const std::string& name, int64_t tid);

#ifndef __APPLE__
    static void set_idle_sched(int64_t tid);

    static void set_thread_nice_value(int64_t tid);
#endif

    // not the system TID, since pthread_t is less prone to being recycled.
    void add_thread(const pthread_t& pthread_id, const std::string& name,
                    const std::string& category, int64_t tid);

    // Removes a thread from the supplied category. If the thread has
    // already been removed, this is a no-op.
    void remove_thread(const pthread_t& pthread_id, const std::string& category);

    void display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const;

private:
    // Container class for any details we want to capture about a thread
    // TODO: Add start-time.
    // TODO: Track fragment ID.
    class ThreadDescriptor {
    public:
        ThreadDescriptor() {}
        ThreadDescriptor(std::string category, std::string name, int64_t thread_id)
                : _name(std::move(name)), _category(std::move(category)), _thread_id(thread_id) {}

        const std::string& name() const { return _name; }
        const std::string& category() const { return _category; }
        int64_t thread_id() const { return _thread_id; }

    private:
        std::string _name;
        std::string _category;
        int64_t _thread_id;
    };

    void summarize_thread_descriptor(const ThreadDescriptor& desc, EasyJson* ej) const;

    // A ThreadCategory is a set of threads that are logically related.
    // TODO: unordered_map is incompatible with pthread_t, but would be more
    // efficient here.
    typedef std::map<const pthread_t, ThreadDescriptor> ThreadCategory;

    // All thread categories, keyed on the category name.
    typedef std::map<std::string, ThreadCategory> ThreadCategoryMap;

    // Protects _thread_categories and thread metrics.
    mutable std::mutex _lock;

    // All thread categories that ever contained a thread, even if empty
    ThreadCategoryMap _thread_categories;

    // Counters to track all-time total number of threads, and the
    // current number of running threads.
    uint64_t _threads_started_metric;
    uint64_t _threads_running_metric;

    DISALLOW_COPY_AND_ASSIGN(ThreadMgr);
};

void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) {
    if (tid == getpid()) {
        return;
    }
#ifdef __APPLE__
    int err = pthread_setname_np(name.c_str());
#else
    int err = prctl(PR_SET_NAME, name.c_str());
#endif
    if (err < 0 && errno != EPERM) {
        LOG(ERROR) << "set_thread_name";
    }
}

#ifndef __APPLE__
void ThreadMgr::set_idle_sched(int64_t tid) {
    if (tid == getpid()) {
        return;
    }
    struct sched_param sp = {.sched_priority = 0};
    int err = sched_setscheduler(0, SCHED_IDLE, &sp);
    if (err < 0 && errno != EPERM) {
        LOG(ERROR) << "set_thread_idle_sched";
    }
}

void ThreadMgr::set_thread_nice_value(int64_t tid) {
    if (tid == getpid()) {
        return;
    }
    // From Linux kernel:
    // In the current implementation, each unit of difference in the nice values of two
    // processes results in a factor of 1.25 in the degree to which the
    // scheduler favors the higher priority process.  This causes very
    // low nice values (+19) to truly provide little CPU to a process
    // whenever there is any other higher priority load on the system,
    // and makes high nice values (-20) deliver most of the CPU to
    // applications that require it (e.g., some audio applications).

    // Choose 5 as lower priority value, default is 0
    int err = setpriority(PRIO_PROCESS, 0, config::scan_thread_nice_value);
    if (err < 0 && errno != EPERM) {
        LOG(ERROR) << "set_thread_low_priority";
    }
}
#endif

void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name,
                           const std::string& category, int64_t tid) {
    std::unique_lock<std::mutex> l(_lock);
    _thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid);
    _threads_running_metric++;
    _threads_started_metric++;
}

void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& category) {
    std::unique_lock<std::mutex> l(_lock);
    auto category_it = _thread_categories.find(category);
    DCHECK(category_it != _thread_categories.end());
    category_it->second.erase(pthread_id);
    _threads_running_metric--;
}

void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args,
                                        EasyJson* ej) const {
    if (args.contains("group")) {
        const auto& category_name = args.at("group");
        bool requested_all = category_name == "all";
        ej->Set("requested_thread_group", EasyJson::kObject);
        (*ej)["group_name"] = escape_for_html_to_string(category_name);
        (*ej)["requested_all"] = requested_all;

        // The critical section is as short as possible so as to minimize the delay
        // imposed on new threads that acquire the lock in write mode.
        std::vector<ThreadDescriptor> descriptors_to_print;
        if (!requested_all) {
            std::unique_lock<std::mutex> l(_lock);
            if (!_thread_categories.contains(category_name)) {
                return;
            }
            for (const auto& elem : _thread_categories.at(category_name)) {
                descriptors_to_print.emplace_back(elem.second);
            }
        } else {
            std::unique_lock<std::mutex> l(_lock);
            for (const auto& category : _thread_categories) {
                for (const auto& elem : category.second) {
                    descriptors_to_print.emplace_back(elem.second);
                }
            }
        }

        EasyJson found = (*ej).Set("found", EasyJson::kObject);
        EasyJson threads = found.Set("threads", EasyJson::kArray);
        for (const auto& desc : descriptors_to_print) {
            summarize_thread_descriptor(desc, &threads);
        }
    } else {
        // List all thread groups and the number of threads running in each.
        std::vector<std::pair<std::string, uint64_t>> thread_categories_info;
        uint64_t running;
        {
            std::unique_lock<std::mutex> l(_lock);
            running = _threads_running_metric;
            thread_categories_info.reserve(_thread_categories.size());
            for (const auto& category : _thread_categories) {
                thread_categories_info.emplace_back(category.first, category.second.size());
            }

            (*ej)["total_threads_running"] = running;
            EasyJson groups = ej->Set("groups", EasyJson::kArray);
            for (const auto& elem : thread_categories_info) {
                std::string category_arg;
                url_encode(elem.first, &category_arg);
                EasyJson group = groups.PushBack(EasyJson::kObject);
                group["encoded_group_name"] = category_arg;
                group["group_name"] = elem.first;
                group["threads_running"] = elem.second;
            }
        }
    }
}

void ThreadMgr::summarize_thread_descriptor(const ThreadMgr::ThreadDescriptor& desc,
                                            EasyJson* ej) const {
    ThreadStats stats;
    Status status = get_thread_stats(desc.thread_id(), &stats);
    if (!status.ok()) {
        LOG(WARNING) << "Could not get per-thread statistics: " << status.to_string();
    }

    EasyJson thread = ej->PushBack(EasyJson::kObject);
    thread["thread_name"] = desc.name();
    thread["user_sec"] = static_cast<double>(stats.user_ns) / 1e9;
    thread["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9;
    thread["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9;
}

Thread::~Thread() {
    if (_joinable) {
        int ret = pthread_detach(_thread);
        CHECK_EQ(ret, 0);
    }
}

void Thread::set_self_name(const std::string& name) {
    ThreadMgr::set_thread_name(name, current_thread_id());
}

#ifndef __APPLE__
void Thread::set_idle_sched() {
    ThreadMgr::set_idle_sched(current_thread_id());
}

void Thread::set_thread_nice_value() {
    ThreadMgr::set_thread_nice_value(current_thread_id());
}
#endif

void Thread::join() {
    static_cast<void>(ThreadJoiner(this).join());
}

int64_t Thread::tid() const {
    int64_t t = _tid.load();
    if (t != PARENT_WAITING_TID) {
        return t;
    }
    return wait_for_tid();
}

pthread_t Thread::pthread_id() const {
    return _thread;
}

const std::string& Thread::name() const {
    return _name;
}

const std::string& Thread::category() const {
    return _category;
}

std::string Thread::to_string() const {
    return absl::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category);
}

Thread* Thread::current_thread() {
    return _tls;
}

int64_t Thread::unique_thread_id() {
#ifdef __APPLE__
    uint64_t tid;
    pthread_threadid_np(pthread_self(), &tid);
    return tid;
#else
    return static_cast<int64_t>(pthread_self());
#endif
}

int64_t Thread::current_thread_id() {
#ifdef __APPLE__
    uint64_t tid;
    pthread_threadid_np(nullptr, &tid);
    return tid;
#else
    return syscall(SYS_gettid);
#endif
}

int64_t Thread::wait_for_tid() const {
    int loop_count = 0;
    while (true) {
        int64_t t = _tid.load();
        if (t != PARENT_WAITING_TID) {
            return t;
        }
        // copied from boost::detail::yield
        int k = loop_count++;
        if (k < 32 || k & 1) {
            sched_yield();
        } else {
            // g++ -Wextra warns on {} or {0}
            struct timespec rqtp = {0, 0};

            // POSIX says that timespec has tv_sec and tv_nsec
            // But it doesn't guarantee order or placement

            rqtp.tv_sec = 0;
            rqtp.tv_nsec = 1000;

            nanosleep(&rqtp, 0);
        }
    }
}

Status Thread::start_thread(const std::string& category, const std::string& name,
                            const ThreadFunctor& functor, std::shared_ptr<Thread>* holder) {
    std::call_once(once, init_threadmgr);

    // Temporary reference for the duration of this function.
    auto t = std::make_shared<Thread>(category, name, functor);

    // Optional, and only set if the thread was successfully created.
    //
    // We have to set this before we even start the thread because it's
    // allowed for the thread functor to access 'holder'.
    if (holder) {
        *holder = t;
    }

    t->_tid = PARENT_WAITING_TID;

    // Add a reference count to the thread since SuperviseThread() needs to
    // access the thread object, and we have no guarantee that our caller
    // won't drop the reference as soon as we return. This is dereferenced
    // in FinishThread().
    t->_shared_self = t;

    int ret = pthread_create(&t->_thread, nullptr, &Thread::supervise_thread, t.get());
    if (ret) {
        // If we failed to create the thread, we need to undo all of our prep work.
        t->_tid = INVALID_TID;
        t->_shared_self.reset();
        return Status::RuntimeError("Could not create thread. (error {}) {}", ret, strerror(ret));
    }

    // The thread has been created and is now joinable.
    //
    // Why set this in the parent and not the child? Because only the parent
    // (or someone communicating with the parent) can join, so joinable must
    // be set before the parent returns.
    t->_joinable = true;
    VLOG_NOTICE << "Started thread " << t->tid() << " - " << category << ":" << name;
    return Status::OK();
}

void* Thread::supervise_thread(void* arg) {
    Thread* t = static_cast<Thread*>(arg);
    int64_t system_tid = Thread::current_thread_id();
    PCHECK(system_tid != -1);

    // Take an additional reference to the thread manager, which we'll need below.
    std::shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;

    // Set up the TLS.
    //
    // We could store a ptr in the TLS itself, but as its
    // lifecycle is poorly defined, we'll use a bare pointer. We
    // already incremented the reference count in StartThread.
    Thread::_tls = t;

    // Create thread context, there is no need to create it when func is executed.
    ThreadLocalHandle::create_thread_local_if_not_exits();

    // Publish our tid to '_tid', which unblocks any callers waiting in
    // WaitForTid().
    t->_tid.store(system_tid);

    std::string name = absl::Substitute("$0-$1", t->name(), system_tid);
    ThreadMgr::set_thread_name(name, t->_tid);
    thread_manager->add_thread(pthread_self(), name, t->category(), t->_tid);

    // FinishThread() is guaranteed to run (even if functor_ throws an
    // exception) because pthread_cleanup_push() creates a scoped object
    // whose destructor invokes the provided callback.
    pthread_cleanup_push(&Thread::finish_thread, t);
    t->_functor();
    pthread_cleanup_pop(true);

    return nullptr;
}

void Thread::finish_thread(void* arg) {
    Thread* t = static_cast<Thread*>(arg);

    // We're here either because of the explicit pthread_cleanup_pop() in
    // SuperviseThread() or through pthread_exit(). In either case,
    // thread_manager is guaranteed to be live because thread_mgr_ref in
    // SuperviseThread() is still live.
    thread_manager->remove_thread(pthread_self(), t->category());

    // Signal any Joiner that we're done.
    t->_done.count_down();

    VLOG_CRITICAL << "Ended thread " << t->_tid << " - " << t->category() << ":" << t->name();
    t->_shared_self.reset();
    // NOTE: the above 'Release' call could be the last reference to 'this',
    // so 'this' could be destructed at this point. Do not add any code
    // following here!

    ThreadLocalHandle::del_thread_local_if_count_is_zero();
}

void Thread::init_threadmgr() {
    thread_manager.reset(new ThreadMgr());
}

ThreadJoiner::ThreadJoiner(Thread* thr)
        : _thread(CHECK_NOTNULL(thr)),
          _warn_after_ms(kDefaultWarnAfterMs),
          _warn_every_ms(kDefaultWarnEveryMs),
          _give_up_after_ms(kDefaultGiveUpAfterMs) {}

ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) {
    _warn_after_ms = ms;
    return *this;
}

ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) {
    _warn_every_ms = ms;
    return *this;
}

ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) {
    _give_up_after_ms = ms;
    return *this;
}

Status ThreadJoiner::join() {
    if (Thread::current_thread() && Thread::current_thread()->tid() == _thread->tid()) {
        return Status::InvalidArgument("Can't join on own thread. (error {}) {}", -1,
                                       _thread->_name);
    }

    // Early exit: double join is a no-op.
    if (!_thread->_joinable) {
        return Status::OK();
    }

    int waited_ms = 0;
    bool keep_trying = true;
    while (keep_trying) {
        if (waited_ms >= _warn_after_ms) {
            LOG(WARNING) << absl::Substitute("Waited for $0ms trying to join with $1 (tid $2)",
                                             waited_ms, _thread->_name, _thread->_tid.load());
        }

        int remaining_before_giveup = std::numeric_limits<int>::max();
        if (_give_up_after_ms != -1) {
            remaining_before_giveup = _give_up_after_ms - waited_ms;
        }

        int remaining_before_next_warn = _warn_every_ms;
        if (waited_ms < _warn_after_ms) {
            remaining_before_next_warn = _warn_after_ms - waited_ms;
        }

        if (remaining_before_giveup < remaining_before_next_warn) {
            keep_trying = false;
        }

        int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn);

        if (_thread->_done.wait_for(std::chrono::milliseconds(wait_for))) {
            // Unconditionally join before returning, to guarantee that any TLS
            // has been destroyed (pthread_key_create() destructors only run
            // after a pthread's user method has returned).
            int ret = pthread_join(_thread->_thread, nullptr);
            CHECK_EQ(ret, 0);
            _thread->_joinable = false;
            return Status::OK();
        }
        waited_ms += wait_for;
    }
    return Status::Aborted("Timed out after {}ms joining on {}", waited_ms, _thread->_name);
}

void register_thread_display_page(WebPageHandler* web_page_handler) {
    web_page_handler->register_template_page(
            "/threadz", "Threads",
            std::bind(&ThreadMgr::display_thread_callback, thread_manager.get(),
                      std::placeholders::_1, std::placeholders::_2),
            true);
}
} // namespace doris
