blob: 4cd7a4baad907c2d3dd167f6c6f2c32e1f3153e7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <bthread/bthread.h>
#include <bthread/types.h>
#include <string>
#include <thread>
#include "common/exception.h"
#include "runtime/workload_management/resource_context.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
namespace doris {
class ThreadContext;
extern bthread_key_t btls_key;
// Is true after ThreadContext construction.
inline thread_local bool pthread_context_ptr_init = false;
inline thread_local constinit ThreadContext* thread_context_ptr = nullptr;
// The thread context saves some info about a working thread.
// 2 required info:
// 1. thread_id: Current thread id, Auto generated.
// 2. type(abolished): The type is a enum value indicating which type of task current thread is running.
// For example: QUERY, LOAD, COMPACTION, ...
// 3. task id: A unique id to identify this task. maybe query id, load job id, etc.
// 4. ThreadMemTrackerMgr
//
// There may be other optional info to be added later.
class ThreadContext {
public:
ThreadContext() { thread_mem_tracker_mgr = std::make_unique<ThreadMemTrackerMgr>(); }
~ThreadContext() = default;
void attach_task(const std::shared_ptr<ResourceContext>& rc) {
resource_ctx = rc;
thread_mem_tracker_mgr->attach_task(rc->memory_context()->memtracker_limiter(), rc->workload_group_context()->workload_group());
}
void detach_task() {
resource_ctx.reset();
thread_mem_tracker_mgr->detach_task();
}
static std::string get_thread_id() {
std::stringstream ss;
ss << std::this_thread::get_id();
return ss.str();
}
// Note that if set global Memory Hook, After thread_mem_tracker_mgr is initialized,
// the current thread Hook starts to consume/release mem_tracker.
// the use of shared_ptr will cause a crash. The guess is that there is an
// intermediate state during the copy construction of shared_ptr. Shared_ptr is not equal
// to nullptr, but the object it points to is not initialized. At this time, when the memory
// is released somewhere, the hook is triggered to cause the crash.
std::unique_ptr<ThreadMemTrackerMgr> thread_mem_tracker_mgr;
std::shared_ptr<ResourceContext> resource_ctx;
int thread_local_handle_count = 0;
};
class ThreadLocalHandle {
public:
static void create_thread_local_if_not_exits() {
if (bthread_self() == 0) {
if (!pthread_context_ptr_init) {
thread_context_ptr = new ThreadContext();
pthread_context_ptr_init = true;
}
DCHECK(thread_context_ptr != nullptr);
thread_context_ptr->thread_local_handle_count++;
} else {
// Avoid calling bthread_getspecific frequently to get bthread local.
// Very frequent bthread_getspecific will slow, but create_thread_local_if_not_exits is not expected to be much.
// Cache the pointer of bthread local in pthead local.
auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
if (bthread_context == nullptr) {
// If bthread_context == nullptr:
// 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
// 2. There are not enough reusable btls in btls pool.
// else if bthread_context != nullptr:
// 1. A new bthread starts, but get a reuses btls.
bthread_context = new ThreadContext;
// The brpc server should respond as quickly as possible.
bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
// set the data so that next time bthread_getspecific in the thread returns the data.
CHECK(0 == bthread_setspecific(btls_key, bthread_context) || doris::k_doris_exit);
}
DCHECK(bthread_context != nullptr);
bthread_context->thread_local_handle_count++;
}
}
// `create_thread_local_if_not_exits` and `del_thread_local_if_count_is_zero` should be used in pairs,
// `del_thread_local_if_count_is_zero` should only be called if `create_thread_local_if_not_exits` returns true
static void del_thread_local_if_count_is_zero() {
if (pthread_context_ptr_init) {
// in pthread
thread_context_ptr->thread_local_handle_count--;
if (thread_context_ptr->thread_local_handle_count == 0) {
pthread_context_ptr_init = false;
delete doris::thread_context_ptr;
thread_context_ptr = nullptr;
}
} else if (bthread_self() != 0) {
// in bthread
auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
DCHECK(bthread_context != nullptr);
bthread_context->thread_local_handle_count--;
} else {
throw Exception(Status::FatalError("__builtin_unreachable"));
}
}
};
// must call create_thread_local_if_not_exits() before use thread_context().
static ThreadContext* thread_context(bool allow_return_null = false) {
if (pthread_context_ptr_init) {
// in pthread
DCHECK(bthread_self() == 0);
DCHECK(thread_context_ptr != nullptr);
return thread_context_ptr;
}
if (bthread_self() != 0) {
// in bthread
// bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations.
auto* bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
DCHECK(bthread_context != nullptr && bthread_context->thread_local_handle_count > 0);
return bthread_context;
}
if (allow_return_null) {
return nullptr;
}
// It means that use thread_context() but this thread not attached a query/load using SCOPED_ATTACH_TASK macro.
throw Exception(
Status::FatalError("__builtin_unreachable, {}", doris::memory_orphan_check_msg));
}
} // namespace doris