// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Copied from Impala and adapted to Kudu.

#include "kudu/util/thread.h"

#if defined(__linux__)
#include <sys/prctl.h>
#endif // defined(__linux__)
#include <sys/resource.h>
#include <sys/time.h>
#include <unistd.h>

#include <algorithm>
#include <cerrno>
#include <cstring>
#include <memory>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <utility>
#include <vector>

#include <boost/smart_ptr/shared_ptr.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/once.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/easy_json.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/kernel_stack_watchdog.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/os-util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/trace.h"
#include "kudu/util/url-coding.h"
#include "kudu/util/web_callback_registry.h"

using std::ostringstream;
using std::pair;
using std::shared_ptr;
using std::string;
using std::vector;
using std::unordered_map;
using strings::Substitute;

METRIC_DEFINE_gauge_uint64(server, threads_started,
                           "Threads Started",
                           kudu::MetricUnit::kThreads,
                           "Total number of threads started on this server",
                           kudu::MetricLevel::kDebug,
                           kudu::EXPOSE_AS_COUNTER);

METRIC_DEFINE_gauge_uint64(server, threads_running,
                           "Threads Running",
                           kudu::MetricUnit::kThreads,
                           "Current number of running threads",
                           kudu::MetricLevel::kInfo);

METRIC_DEFINE_gauge_uint64(server, cpu_utime,
                           "User CPU Time",
                           kudu::MetricUnit::kMilliseconds,
                           "Total user CPU time of the process",
                           kudu::MetricLevel::kInfo,
                           kudu::EXPOSE_AS_COUNTER);

METRIC_DEFINE_gauge_uint64(server, cpu_stime,
                           "System CPU Time",
                           kudu::MetricUnit::kMilliseconds,
                           "Total system CPU time of the process",
                           kudu::MetricLevel::kInfo,
                           kudu::EXPOSE_AS_COUNTER);

METRIC_DEFINE_gauge_uint64(server, voluntary_context_switches,
                           "Voluntary Context Switches",
                           kudu::MetricUnit::kContextSwitches,
                           "Total voluntary context switches",
                           kudu::MetricLevel::kInfo,
                           kudu::EXPOSE_AS_COUNTER);

METRIC_DEFINE_gauge_uint64(server, involuntary_context_switches,
                           "Involuntary Context Switches",
                           kudu::MetricUnit::kContextSwitches,
                           "Total involuntary context switches",
                           kudu::MetricLevel::kInfo,
                           kudu::EXPOSE_AS_COUNTER);

DEFINE_int32(thread_inject_start_latency_ms, 0,
             "Number of ms to sleep when starting a new thread. (For tests).");
TAG_FLAG(thread_inject_start_latency_ms, hidden);
TAG_FLAG(thread_inject_start_latency_ms, unsafe);

namespace kudu {

static uint64_t GetCpuUTime() {
  struct rusage ru;
  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
  return ru.ru_utime.tv_sec * 1000UL + ru.ru_utime.tv_usec / 1000UL;
}

static uint64_t GetCpuSTime() {
  struct rusage ru;
  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
  return ru.ru_stime.tv_sec * 1000UL + ru.ru_stime.tv_usec / 1000UL;
}

static uint64_t GetVoluntaryContextSwitches() {
  struct rusage ru;
  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
  return ru.ru_nvcsw;
}

static uint64_t GetInVoluntaryContextSwitches() {
  struct rusage ru;
  CHECK_ERR(getrusage(RUSAGE_SELF, &ru));
  return ru.ru_nivcsw;
}

class ThreadMgr;

__thread Thread* Thread::tls_ = nullptr;

// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread.
// The Thread class adds a reference to thread_manager while it is supervising a thread so
// that a race between the end of the process's main thread (and therefore the destruction
// of thread_manager) and the end of a thread that tries to remove itself from the
// manager after the destruction can be avoided.
static shared_ptr<ThreadMgr> thread_manager;

// Controls the single (lazy) initialization of thread_manager.
static GoogleOnceType once = GOOGLE_ONCE_INIT;

// A singleton class that tracks all live threads, and groups them together for easy
// auditing. Used only by Thread.
class ThreadMgr {
 public:
  ThreadMgr()
      : threads_started_metric_(0),
        threads_running_metric_(0) {
  }

  ~ThreadMgr() {
    thread_categories_.clear();
  }

  static void SetThreadName(const string& name, int64_t tid);

  Status StartInstrumentation(const scoped_refptr<MetricEntity>& metrics,
                              WebCallbackRegistry* web) const;

  // Registers a thread to the supplied category. The key is a pthread_t,
  // not the system TID, since pthread_t is less prone to being recycled.
  void AddThread(const pthread_t& pthread_id, const string& name, const string& category,
      int64_t tid);

  // Removes a thread from the supplied category. If the thread has
  // already been removed, this is a no-op.
  void RemoveThread(const pthread_t& pthread_id, const string& category);

  // Metric callback for number of threads running. Also used for error messages.
  uint64_t ReadThreadsRunning() const;

 private:
  // Container class for any details we want to capture about a thread
  // TODO: Add start-time.
  // TODO: Track fragment ID.
  class ThreadDescriptor {
   public:
    ThreadDescriptor() { }
    ThreadDescriptor(string category, string name, int64_t thread_id)
        : name_(std::move(name)),
          category_(std::move(category)),
          thread_id_(thread_id) {}

    const string& name() const { return name_; }
    const string& category() const { return category_; }
    int64_t thread_id() const { return thread_id_; }

    struct Comparator {
      bool operator()(const ThreadDescriptor& rhs, const ThreadDescriptor& lhs) const {
        return rhs.name() < lhs.name();
      }
    };

   private:
    string name_;
    string category_;
    int64_t thread_id_;
  };

  struct ThreadIdHash {
    size_t operator()(pthread_t thread_id) const noexcept {
      return std::hash<pthread_t>()(thread_id);
    }
  };

  struct ThreadIdEqual {
    bool operator()(pthread_t lhs, pthread_t rhs) const {
      return pthread_equal(lhs, rhs) != 0;
    }
  };

  // A ThreadCategory is a set of threads that are logically related.
  typedef unordered_map<const pthread_t, ThreadDescriptor,
                        ThreadIdHash, ThreadIdEqual> ThreadCategory;

  // All thread categories, keyed on the category name.
  typedef unordered_map<string, ThreadCategory> ThreadCategoryMap;

  // Protects thread_categories_ and thread metrics.
  mutable rw_spinlock lock_;

  // All thread categories that ever contained a thread, even if empty.
  ThreadCategoryMap thread_categories_;

  // Counters to track all-time total number of threads, and the
  // current number of running threads.
  uint64_t threads_started_metric_;
  uint64_t threads_running_metric_;

  // Metric callback for number of threads started.
  uint64_t ReadThreadsStarted() const;

  // Webpage callback; prints all threads by category.
  void ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
                         WebCallbackRegistry::WebResponse* resp) const;
  void SummarizeThreadDescriptor(const ThreadDescriptor& desc,
                                 EasyJson* output) const;
};

void ThreadMgr::SetThreadName(const string& name, int64_t tid) {
  // On linux we can get the thread names to show up in the debugger by setting
  // the process name for the LWP.  We don't want to do this for the main
  // thread because that would rename the process, causing tools like killall
  // to stop working.
  if (tid == getpid()) {
    return;
  }

#if defined(__linux__)
  // http://0pointer.de/blog/projects/name-your-threads.html
  // Set the name for the LWP (which gets truncated to 15 characters).
  // Note that glibc also has a 'pthread_setname_np' api, but it may not be
  // available everywhere and it's only benefit over using prctl directly is
  // that it can set the name of threads other than the current thread.
  int err = prctl(PR_SET_NAME, name.c_str());
#else
  int err = pthread_setname_np(name.c_str());
#endif // defined(__linux__)
  // We expect EPERM failures in sandboxed processes, just ignore those.
  if (err < 0 && errno != EPERM) {
    PLOG(ERROR) << "SetThreadName";
  }
}

Status ThreadMgr::StartInstrumentation(const scoped_refptr<MetricEntity>& metrics,
                                       WebCallbackRegistry* web) const {
  // Use function gauges here so that we can register a unique copy of these metrics in
  // multiple tservers, even though the ThreadMgr is itself a singleton.
  metrics->NeverRetire(
      METRIC_threads_started.InstantiateFunctionGauge(
          metrics, [this]() { return this->ReadThreadsStarted(); }));
  metrics->NeverRetire(
      METRIC_threads_running.InstantiateFunctionGauge(
          metrics, [this]() { return this->ReadThreadsRunning(); }));
  metrics->NeverRetire(
      METRIC_cpu_utime.InstantiateFunctionGauge(
          metrics, []() { return GetCpuUTime(); }));
  metrics->NeverRetire(
      METRIC_cpu_stime.InstantiateFunctionGauge(
          metrics, []() { return GetCpuSTime(); }));
  metrics->NeverRetire(
      METRIC_voluntary_context_switches.InstantiateFunctionGauge(
          metrics, []() { return GetVoluntaryContextSwitches(); }));
  metrics->NeverRetire(
      METRIC_involuntary_context_switches.InstantiateFunctionGauge(
          metrics, []() { return GetInVoluntaryContextSwitches(); }));

  if (web) {
    DCHECK_NOTNULL(web)->RegisterPathHandler(
        "/threadz", "Threads", [this](const WebCallbackRegistry::WebRequest& req,
                                      WebCallbackRegistry::WebResponse* resp) {
          this->ThreadPathHandler(req, resp);
        },
        /* is_styled= */ true,
        /* is_on_nav_bar= */ true);
  }
  return Status::OK();
}

uint64_t ThreadMgr::ReadThreadsStarted() const {
  shared_lock<decltype(lock_)> l(lock_);
  return threads_started_metric_;
}

uint64_t ThreadMgr::ReadThreadsRunning() const {
  shared_lock<decltype(lock_)> l(lock_);
  return threads_running_metric_;
}

void ThreadMgr::AddThread(const pthread_t& pthread_id, const string& name,
    const string& category, int64_t tid) {
  // These annotations cause TSAN to ignore the synchronization on lock_
  // without causing the subsequent mutations to be treated as data races
  // in and of themselves (that's what IGNORE_READS_AND_WRITES does).
  //
  // Why do we need them here and in SuperviseThread()? TSAN operates by
  // observing synchronization events and using them to establish "happens
  // before" relationships between threads. Where these relationships are
  // not built, shared state access constitutes a data race. The
  // synchronization events here, in RemoveThread(), and in
  // SuperviseThread() may cause TSAN to establish a "happens before"
  // relationship between thread functors, ignoring potential data races.
  // The annotations prevent this from happening.
  ANNOTATE_IGNORE_SYNC_BEGIN();
  ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
  {
    // NOTE: Not using EmplaceOrDie() here -- that's because in environments
    //   where fork() is called after some threads have been spawned, child
    //   processes will inadvertently inherit the contents of the thread
    //   registry (i.e. the entries in the thread_categories_ container).
    //   For some platforms, pthread_t handles for threads in different
    //   processes might be the same, so using EmplaceOrDie() would induce
    //   a crash when ThreadMgr::AddThread() is called for a new thread
    //   in the child process.
    //
    // TODO(aserbin): maybe, keep the thread_categories_ registry not in a
    //   global static container, but bind the container with the life cycle
    //   of some top-level object that uses the ThreadMgr as a singleton.
    std::lock_guard<decltype(lock_)> l(lock_);
    thread_categories_[category][pthread_id] =
        ThreadDescriptor(category, name, tid);
    ++threads_running_metric_;
    ++threads_started_metric_;
  }
  ANNOTATE_IGNORE_SYNC_END();
  ANNOTATE_IGNORE_READS_AND_WRITES_END();
}

void ThreadMgr::RemoveThread(const pthread_t& pthread_id, const string& category) {
  ANNOTATE_IGNORE_SYNC_BEGIN();
  ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN();
  {
    std::lock_guard<decltype(lock_)> l(lock_);
    auto& threads = FindOrDie(thread_categories_, category);
    auto num_erased = threads.erase(pthread_id);
    CHECK_EQ(1, num_erased);
    --threads_running_metric_;
  }
  ANNOTATE_IGNORE_SYNC_END();
  ANNOTATE_IGNORE_READS_AND_WRITES_END();
}

void ThreadMgr::SummarizeThreadDescriptor(const ThreadDescriptor& desc,
                                          EasyJson* output) const {
  ThreadStats stats;
  Status status = GetThreadStats(desc.thread_id(), &stats);
  if (!status.ok()) {
    KLOG_EVERY_N(INFO, 100) << "Could not get per-thread statistics: "
                            << status.ToString();
  }
  EasyJson thr = output->PushBack(EasyJson::kObject);
  thr["thread_name"] = desc.name();
  thr["user_sec"] = static_cast<double>(stats.user_ns) / 1e9;
  thr["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9;
  thr["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9;
}

void ThreadMgr::ThreadPathHandler(const WebCallbackRegistry::WebRequest& req,
                                  WebCallbackRegistry::WebResponse* resp) const {
  EasyJson& output = resp->output;
  const auto* category_name = FindOrNull(req.parsed_args, "group");
  if (category_name) {
    // List all threads belonging to the desired thread group.
    bool requested_all = *category_name == "all";
    EasyJson rtg = output.Set("requested_thread_group", EasyJson::kObject);
    rtg["group_name"] = EscapeForHtmlToString(*category_name);
    rtg["requested_all"] = requested_all;

    // The critical section is as short as possible so as to minimize the delay
    // imposed on new threads that acquire the lock in write mode.
    vector<ThreadDescriptor> descriptors_to_print;
    if (!requested_all) {
      shared_lock<decltype(lock_)> l(lock_);
      const auto* category = FindOrNull(thread_categories_, *category_name);
      if (!category) {
        return;
      }
      for (const auto& elem : *category) {
        descriptors_to_print.emplace_back(elem.second);
      }
    } else {
      shared_lock<decltype(lock_)> l(lock_);
      for (const auto& category : thread_categories_) {
        for (const auto& elem : category.second) {
          descriptors_to_print.emplace_back(elem.second);
        }
      }
    }

    EasyJson found = rtg.Set("found", EasyJson::kObject);
    EasyJson threads = found.Set("threads", EasyJson::kArray);
    for (const auto& desc : descriptors_to_print) {
      SummarizeThreadDescriptor(desc, &threads);
    }
  } else {
    // List all thread groups and the number of threads running in each.
    vector<pair<string, uint64_t>> thread_categories_info;
    uint64_t running;
    {
      // See comment above regarding short critical sections.
      shared_lock<decltype(lock_)> l(lock_);
      running = threads_running_metric_;
      thread_categories_info.reserve(thread_categories_.size());
      for (const auto& category : thread_categories_) {
        thread_categories_info.emplace_back(category.first, category.second.size());
      }
    }

    output["total_threads_running"] = running;
    EasyJson groups = output.Set("groups", EasyJson::kArray);
    for (const auto& elem : thread_categories_info) {
      string category_arg;
      if (WebCallbackRegistry::IsProxiedViaKnox(req)) {
        // Knox encodes query parameter values when it rewrites HTTP responses.
        // If we also encoded, we'd end up with broken URLs. For example, we'd
        // encode the query parameter 'group=service pool' to
        // 'group=service%20pool', then Knox would encode it again to
        // 'group=service%2520pool'.
        category_arg = elem.first;
      } else {
        UrlEncode(elem.first, &category_arg);
      }
      EasyJson g = groups.PushBack(EasyJson::kObject);
      g["encoded_group_name"] = category_arg;
      g["group_name"] = elem.first;
      g["threads_running"] = elem.second;
    }
  }
}

static void InitThreading() {
  thread_manager.reset(new ThreadMgr());
}

Status StartThreadInstrumentation(const scoped_refptr<MetricEntity>& server_metrics,
                                  WebCallbackRegistry* web) {
  GoogleOnceInit(&once, &InitThreading);
  return thread_manager->StartInstrumentation(server_metrics, web);
}

ThreadJoiner::ThreadJoiner(Thread* thr)
  : thread_(CHECK_NOTNULL(thr)),
    warn_after_ms_(kDefaultWarnAfterMs),
    warn_every_ms_(kDefaultWarnEveryMs),
    give_up_after_ms_(kDefaultGiveUpAfterMs) {
}

ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) {
  warn_after_ms_ = ms;
  return *this;
}

ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) {
  warn_every_ms_ = ms;
  return *this;
}

ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) {
  give_up_after_ms_ = ms;
  return *this;
}

Status ThreadJoiner::Join() {
  if (Thread::current_thread() &&
      Thread::current_thread()->tid() == thread_->tid()) {
    return Status::InvalidArgument("Can't join on own thread", thread_->name_);
  }

  // Early exit: double join is a no-op.
  if (!thread_->joinable_) {
    return Status::OK();
  }

  int waited_ms = 0;
  bool keep_trying = true;
  while (keep_trying) {
    if (waited_ms >= warn_after_ms_) {
      LOG(WARNING) << Substitute("Waited for $0ms trying to join with $1 (tid $2)",
                                 waited_ms, thread_->name_, thread_->tid_);
    }

    int remaining_before_giveup = MathLimits<int>::kMax;
    if (give_up_after_ms_ != -1) {
      remaining_before_giveup = give_up_after_ms_ - waited_ms;
    }

    int remaining_before_next_warn = warn_every_ms_;
    if (waited_ms < warn_after_ms_) {
      remaining_before_next_warn = warn_after_ms_ - waited_ms;
    }

    if (remaining_before_giveup < remaining_before_next_warn) {
      keep_trying = false;
    }

    int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn);

    if (thread_->done_.WaitFor(MonoDelta::FromMilliseconds(wait_for))) {
      // Unconditionally join before returning, to guarantee that any TLS
      // has been destroyed (pthread_key_create() destructors only run
      // after a pthread's user method has returned).
      int ret = pthread_join(thread_->thread_, nullptr);
      CHECK_EQ(ret, 0);
      thread_->joinable_ = false;
      return Status::OK();
    }
    waited_ms += wait_for;
  }
  return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1",
                                             waited_ms, thread_->name_));
}

Thread::~Thread() {
  if (joinable_) {
    int ret = pthread_detach(thread_);
    CHECK_EQ(ret, 0);
  }
}

string Thread::ToString() const {
  return Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), name_, category_);
}

int64_t Thread::WaitForTid() const {
  const string log_prefix = Substitute("$0 ($1) ", name_, category_);
  SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix,
                                   "waiting for new thread to publish its TID");
  int loop_count = 0;
  while (true) {
    int64_t t = Acquire_Load(&tid_);
    if (t != PARENT_WAITING_TID) return t;
    boost::detail::yield(loop_count++);
  }
}


Status Thread::StartThread(string category, string name,
                           std::function<void()> functor, uint64_t flags,
                           scoped_refptr<Thread> *holder) {
  TRACE_COUNTER_INCREMENT("threads_started", 1);
  TRACE_COUNTER_SCOPE_LATENCY_US("thread_start_us");
  GoogleOnceInit(&once, &InitThreading);

  const string log_prefix = Substitute("$0 ($1) ", name, category);
  SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "starting thread");

  // Temporary reference for the duration of this function.
  scoped_refptr<Thread> t(new Thread(
      std::move(category), std::move(name), std::move(functor)));

  // Optional, and only set if the thread was successfully created.
  //
  // We have to set this before we even start the thread because it's
  // allowed for the thread functor to access 'holder'.
  if (holder) {
    *holder = t;
  }

  t->tid_ = PARENT_WAITING_TID;

  // Add a reference count to the thread since SuperviseThread() needs to
  // access the thread object, and we have no guarantee that our caller
  // won't drop the reference as soon as we return. This is dereferenced
  // in FinishThread().
  t->AddRef();

  auto cleanup = MakeScopedCleanup([&]() {
      // If we failed to create the thread, we need to undo all of our prep work.
      t->tid_ = INVALID_TID;
      t->Release();
    });

  if (PREDICT_FALSE(FLAGS_thread_inject_start_latency_ms > 0)) {
    LOG(INFO) << "Injecting " << FLAGS_thread_inject_start_latency_ms << "ms sleep on thread start";
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_thread_inject_start_latency_ms));
  }

  {
    SCOPED_LOG_SLOW_EXECUTION_PREFIX(WARNING, 500 /* ms */, log_prefix, "creating pthread");
    SCOPED_WATCH_STACK((flags & NO_STACK_WATCHDOG) ? 0 : 250);
    int ret = pthread_create(&t->thread_, nullptr, &Thread::SuperviseThread, t.get());
    if (ret) {
      string msg = "";
      if (ret == EAGAIN) {
        uint64_t rlimit_nproc = Env::Default()->GetResourceLimit(
            Env::ResourceLimitType::RUNNING_THREADS_PER_EUID);
        uint64_t num_threads = thread_manager->ReadThreadsRunning();
        msg = Substitute(" ($0 Kudu-managed threads running in this process, "
                         "$1 max processes allowed for current user)",
                         num_threads, rlimit_nproc);
      }
      return Status::RuntimeError(Substitute("Could not create thread$0", msg), strerror(ret), ret);
    }
  }

  // The thread has been created and is now joinable.
  //
  // Why set this in the parent and not the child? Because only the parent
  // (or someone communicating with the parent) can join, so joinable must
  // be set before the parent returns.
  t->joinable_ = true;
  cleanup.cancel();

  VLOG(2) << Substitute("Started thread $0 - $1: $2", t->tid(), t->category(), t->name());
  return Status::OK();
}

void* Thread::SuperviseThread(void* arg) {
  Thread* t = static_cast<Thread*>(arg);
  int64_t system_tid = Thread::CurrentThreadId();
  PCHECK(system_tid != -1);

  // Take an additional reference to the thread manager, which we'll need below.
  ANNOTATE_IGNORE_SYNC_BEGIN();
  shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
  ANNOTATE_IGNORE_SYNC_END();

  // Set up the TLS.
  //
  // We could store a scoped_refptr in the TLS itself, but as its
  // lifecycle is poorly defined, we'll use a bare pointer. We
  // already incremented the reference count in StartThread.
  Thread::tls_ = t;

  // Publish our tid to 'tid_', which unblocks any callers waiting in
  // WaitForTid().
  Release_Store(&t->tid_, system_tid);

  string name = strings::Substitute("$0-$1", t->name(), system_tid);
  thread_manager->SetThreadName(name, t->tid_);
  thread_manager->AddThread(pthread_self(), name, t->category(), t->tid_);

  // FinishThread() is guaranteed to run (even if functor_ throws an
  // exception) because pthread_cleanup_push() creates a scoped object
  // whose destructor invokes the provided callback.
  pthread_cleanup_push(&Thread::FinishThread, t);
  t->functor_();
  pthread_cleanup_pop(true);

  return nullptr;
}

void Thread::FinishThread(void* arg) {
  Thread* t = static_cast<Thread*>(arg);

  // We're here either because of the explicit pthread_cleanup_pop() in
  // SuperviseThread() or through pthread_exit(). In either case,
  // thread_manager is guaranteed to be live because thread_mgr_ref in
  // SuperviseThread() is still live.
  thread_manager->RemoveThread(pthread_self(), t->category());

  // Signal any Joiner that we're done.
  t->done_.CountDown();

  VLOG(2) << "Ended thread " << t->tid_ << " - " << t->category() << ":" << t->name();
  t->Release();
  // NOTE: the above 'Release' call could be the last reference to 'this',
  // so 'this' could be destructed at this point. Do not add any code
  // following here!
}

} // namespace kudu
