| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| // IWYU pragma: no_include <bthread/errno.h> |
| #include <errno.h> // IWYU pragma: keep |
| #include <gen_cpp/HeartbeatService_types.h> |
| #include <gen_cpp/Metrics_types.h> |
| #include <stdint.h> |
| #include <stdlib.h> |
| #include <string.h> |
| #include <sys/resource.h> |
| |
| #include <limits> |
| #include <map> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "io/fs/file_meta_cache.h" |
| #include "olap/olap_define.h" |
| #include "olap/options.h" |
| #include "olap/page_cache.h" |
| #include "olap/rowset/segment_v2/inverted_index_cache.h" |
| #include "olap/schema_cache.h" |
| #include "olap/segment_loader.h" |
| #include "pipeline/task_queue.h" |
| #include "pipeline/task_scheduler.h" |
| #include "runtime/block_spill_manager.h" |
| #include "runtime/broker_mgr.h" |
| #include "runtime/cache/result_cache.h" |
| #include "runtime/client_cache.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/external_scan_context_mgr.h" |
| #include "runtime/fragment_mgr.h" |
| #include "runtime/heartbeat_flags.h" |
| #include "runtime/load_channel_mgr.h" |
| #include "runtime/load_path_mgr.h" |
| #include "runtime/memory/cache_manager.h" |
| #include "runtime/memory/mem_tracker.h" |
| #include "runtime/memory/mem_tracker_limiter.h" |
| #include "runtime/memory/thread_mem_tracker_mgr.h" |
| #include "runtime/result_buffer_mgr.h" |
| #include "runtime/result_queue_mgr.h" |
| #include "runtime/routine_load/routine_load_task_executor.h" |
| #include "runtime/runtime_query_statistics_mgr.h" |
| #include "runtime/small_file_mgr.h" |
| #include "runtime/stream_load/new_load_stream_mgr.h" |
| #include "runtime/stream_load/stream_load_executor.h" |
| #include "runtime/task_group/task_group_manager.h" |
| #include "runtime/thread_context.h" |
| #include "service/point_query_executor.h" |
| #include "util/bfd_parser.h" |
| #include "util/bit_util.h" |
| #include "util/brpc_client_cache.h" |
| #include "util/cpu_info.h" |
| #include "util/dns_cache.h" |
| #include "util/doris_metrics.h" |
| #include "util/mem_info.h" |
| #include "util/metrics.h" |
| #include "util/parse_util.h" |
| #include "util/pretty_printer.h" |
| #include "util/threadpool.h" |
| #include "util/timezone_utils.h" |
| #include "vec/exec/scan/scanner_scheduler.h" |
| #include "vec/runtime/vdata_stream_mgr.h" |
| |
| #if !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && \ |
| !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) |
| #include "runtime/memory/tcmalloc_hook.h" |
| #endif |
| |
| namespace doris { |
| class PBackendService_Stub; |
| class PFunctionService_Stub; |
| |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size, MetricUnit::NOUNIT); |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num, MetricUnit::NOUNIT); |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size, MetricUnit::NOUNIT); |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_thread_num, MetricUnit::NOUNIT); |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(download_cache_thread_pool_queue_size, MetricUnit::NOUNIT); |
| |
| Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths) { |
| return env->_init(store_paths); |
| } |
| |
| Status ExecEnv::_init(const std::vector<StorePath>& store_paths) { |
| //Only init once before be destroyed |
| if (_is_init) { |
| return Status::OK(); |
| } |
| _store_paths = store_paths; |
| |
| _external_scan_context_mgr = new ExternalScanContextMgr(this); |
| _vstream_mgr = new doris::vectorized::VDataStreamMgr(); |
| _result_mgr = new ResultBufferMgr(); |
| _result_queue_mgr = new ResultQueueMgr(); |
| _backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host); |
| _frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host); |
| _broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host); |
| |
| TimezoneUtils::load_timezones_to_cache(); |
| |
| ThreadPoolBuilder("SendBatchThreadPool") |
| .set_min_threads(config::send_batch_thread_pool_thread_num) |
| .set_max_threads(config::send_batch_thread_pool_thread_num) |
| .set_max_queue_size(config::send_batch_thread_pool_queue_size) |
| .build(&_send_batch_thread_pool); |
| |
| init_download_cache_required_components(); |
| |
| ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") |
| .set_min_threads(16) |
| .set_max_threads(64) |
| .build(&_buffered_reader_prefetch_thread_pool); |
| |
| // min num equal to fragment pool's min num |
| // max num is useless because it will start as many as requested in the past |
| // queue size is useless because the max thread num is very large |
| ThreadPoolBuilder("SendReportThreadPool") |
| .set_min_threads(config::fragment_pool_thread_num_min) |
| .set_max_threads(std::numeric_limits<int>::max()) |
| .set_max_queue_size(config::fragment_pool_queue_size) |
| .build(&_send_report_thread_pool); |
| |
| ThreadPoolBuilder("JoinNodeThreadPool") |
| .set_min_threads(config::fragment_pool_thread_num_min) |
| .set_max_threads(std::numeric_limits<int>::max()) |
| .set_max_queue_size(config::fragment_pool_queue_size) |
| .build(&_join_node_thread_pool); |
| |
| _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr(); |
| RETURN_IF_ERROR(init_pipeline_task_scheduler()); |
| _task_group_manager = new taskgroup::TaskGroupManager(); |
| _scanner_scheduler = new doris::vectorized::ScannerScheduler(); |
| _fragment_mgr = new FragmentMgr(this); |
| _result_cache = new ResultCache(config::query_cache_max_size_mb, |
| config::query_cache_elasticity_size_mb); |
| _master_info = new TMasterInfo(); |
| _load_path_mgr = new LoadPathMgr(this); |
| _bfd_parser = BfdParser::create(); |
| _broker_mgr = new BrokerMgr(this); |
| _load_channel_mgr = new LoadChannelMgr(); |
| _new_load_stream_mgr = NewLoadStreamMgr::create_shared(); |
| _internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); |
| _function_client_cache = new BrpcClientCache<PFunctionService_Stub>(); |
| _stream_load_executor = StreamLoadExecutor::create_shared(this); |
| _routine_load_task_executor = new RoutineLoadTaskExecutor(this); |
| RETURN_IF_ERROR(_routine_load_task_executor->init()); |
| _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); |
| _block_spill_mgr = new BlockSpillManager(_store_paths); |
| _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); |
| |
| _dns_cache = new DNSCache(); |
| _backend_client_cache->init_metrics("backend"); |
| _frontend_client_cache->init_metrics("frontend"); |
| _broker_client_cache->init_metrics("broker"); |
| _result_mgr->init(); |
| Status status = _load_path_mgr->init(); |
| if (!status.ok()) { |
| LOG(ERROR) << "load path mgr init failed." << status; |
| exit(-1); |
| } |
| _broker_mgr->init(); |
| _small_file_mgr->init(); |
| _scanner_scheduler->init(this); |
| |
| _init_mem_env(); |
| |
| RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); |
| _heartbeat_flags = new HeartbeatFlags(); |
| _register_metrics(); |
| _is_init = true; |
| return Status::OK(); |
| } |
| |
| Status ExecEnv::init_pipeline_task_scheduler() { |
| auto executors_size = config::pipeline_executor_size; |
| if (executors_size <= 0) { |
| executors_size = CpuInfo::num_cores(); |
| } |
| |
| // TODO pipeline task group combie two blocked schedulers. |
| auto t_queue = std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size); |
| auto b_scheduler = std::make_shared<pipeline::BlockedTaskScheduler>(t_queue); |
| _pipeline_task_scheduler = |
| new pipeline::TaskScheduler(this, b_scheduler, t_queue, "WithoutGroupTaskSchePool"); |
| RETURN_IF_ERROR(_pipeline_task_scheduler->start()); |
| |
| auto tg_queue = std::make_shared<pipeline::TaskGroupTaskQueue>(executors_size); |
| auto tg_b_scheduler = std::make_shared<pipeline::BlockedTaskScheduler>(tg_queue); |
| _pipeline_task_group_scheduler = |
| new pipeline::TaskScheduler(this, tg_b_scheduler, tg_queue, "WithGroupTaskSchePool"); |
| RETURN_IF_ERROR(_pipeline_task_group_scheduler->start()); |
| |
| return Status::OK(); |
| } |
| |
| Status ExecEnv::_init_mem_env() { |
| bool is_percent = false; |
| std::stringstream ss; |
| // 1. init mem tracker |
| init_mem_tracker(); |
| thread_context()->thread_mem_tracker_mgr->init(); |
| #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ |
| !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) |
| init_hook(); |
| #endif |
| |
| // 2. init buffer pool |
| if (!BitUtil::IsPowerOf2(config::min_buffer_size)) { |
| ss << "Config min_buffer_size must be a power-of-two: " << config::min_buffer_size; |
| return Status::InternalError(ss.str()); |
| } |
| |
| // 3. init storage page cache |
| CacheManager::create_global_instance(); |
| |
| int64_t storage_cache_limit = |
| ParseUtil::parse_mem_spec(config::storage_page_cache_limit, MemInfo::mem_limit(), |
| MemInfo::physical_mem(), &is_percent); |
| while (!is_percent && storage_cache_limit > MemInfo::mem_limit() / 2) { |
| storage_cache_limit = storage_cache_limit / 2; |
| } |
| int32_t index_percentage = config::index_page_cache_percentage; |
| uint32_t num_shards = config::storage_page_cache_shard_size; |
| if ((num_shards & (num_shards - 1)) != 0) { |
| int old_num_shards = num_shards; |
| num_shards = BitUtil::RoundUpToPowerOfTwo(num_shards); |
| LOG(WARNING) << "num_shards should be power of two, but got " << old_num_shards |
| << ". Rounded up to " << num_shards |
| << ". Please modify the 'storage_page_cache_shard_size' parameter in your " |
| "conf file to be a power of two for better performance."; |
| } |
| int64_t pk_storage_page_cache_limit = |
| ParseUtil::parse_mem_spec(config::pk_storage_page_cache_limit, MemInfo::mem_limit(), |
| MemInfo::physical_mem(), &is_percent); |
| while (!is_percent && pk_storage_page_cache_limit > MemInfo::mem_limit() / 2) { |
| pk_storage_page_cache_limit = storage_cache_limit / 2; |
| } |
| StoragePageCache::create_global_cache(storage_cache_limit, index_percentage, |
| pk_storage_page_cache_limit, num_shards); |
| LOG(INFO) << "Storage page cache memory limit: " |
| << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES) |
| << ", origin config value: " << config::storage_page_cache_limit; |
| |
| // Init row cache |
| int64_t row_cache_mem_limit = |
| ParseUtil::parse_mem_spec(config::row_cache_mem_limit, MemInfo::mem_limit(), |
| MemInfo::physical_mem(), &is_percent); |
| while (!is_percent && row_cache_mem_limit > MemInfo::mem_limit() / 2) { |
| // Reason same as buffer_pool_limit |
| row_cache_mem_limit = row_cache_mem_limit / 2; |
| } |
| RowCache::create_global_cache(row_cache_mem_limit); |
| LOG(INFO) << "Row cache memory limit: " |
| << PrettyPrinter::print(row_cache_mem_limit, TUnit::BYTES) |
| << ", origin config value: " << config::row_cache_mem_limit; |
| |
| uint64_t fd_number = config::min_file_descriptor_number; |
| struct rlimit l; |
| int ret = getrlimit(RLIMIT_NOFILE, &l); |
| if (ret != 0) { |
| LOG(WARNING) << "call getrlimit() failed. errno=" << strerror(errno) |
| << ", use default configuration instead."; |
| } else { |
| fd_number = static_cast<uint64_t>(l.rlim_cur); |
| } |
| // SegmentLoader caches segments in rowset granularity. So the size of |
| // opened files will greater than segment_cache_capacity. |
| int64_t segment_cache_capacity = config::segment_cache_capacity; |
| if (segment_cache_capacity < 0 || segment_cache_capacity > fd_number * 2 / 5) { |
| segment_cache_capacity = fd_number * 2 / 5; |
| } |
| LOG(INFO) << "segment_cache_capacity <= fd_number * 2 / 5, fd_number: " << fd_number |
| << " segment_cache_capacity: " << segment_cache_capacity; |
| SegmentLoader::create_global_instance(segment_cache_capacity); |
| |
| SchemaCache::create_global_instance(config::schema_cache_capacity); |
| |
| LookupConnectionCache::create_global_instance(config::lookup_connection_cache_bytes_limit); |
| |
| // use memory limit |
| int64_t inverted_index_cache_limit = |
| ParseUtil::parse_mem_spec(config::inverted_index_searcher_cache_limit, |
| MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
| while (!is_percent && inverted_index_cache_limit > MemInfo::mem_limit() / 2) { |
| // Reason same as buffer_pool_limit |
| inverted_index_cache_limit = inverted_index_cache_limit / 2; |
| } |
| InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit); |
| LOG(INFO) << "Inverted index searcher cache memory limit: " |
| << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) |
| << ", origin config value: " << config::inverted_index_searcher_cache_limit; |
| |
| // use memory limit |
| int64_t inverted_index_query_cache_limit = |
| ParseUtil::parse_mem_spec(config::inverted_index_query_cache_limit, |
| MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
| while (!is_percent && inverted_index_query_cache_limit > MemInfo::mem_limit() / 2) { |
| // Reason same as buffer_pool_limit |
| inverted_index_query_cache_limit = inverted_index_query_cache_limit / 2; |
| } |
| InvertedIndexQueryCache::create_global_cache(inverted_index_query_cache_limit); |
| LOG(INFO) << "Inverted index query match cache memory limit: " |
| << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) |
| << ", origin config value: " << config::inverted_index_query_cache_limit; |
| |
| // 4. init other managers |
| RETURN_IF_ERROR(_block_spill_mgr->init()); |
| return Status::OK(); |
| } |
| |
| void ExecEnv::init_mem_tracker() { |
| _orphan_mem_tracker = |
| std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "Orphan"); |
| _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); |
| _experimental_mem_tracker = std::make_shared<MemTrackerLimiter>( |
| MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet"); |
| _page_no_cache_mem_tracker = |
| std::make_shared<MemTracker>("PageNoCache", _orphan_mem_tracker_raw); |
| _brpc_iobuf_block_memory_tracker = |
| std::make_shared<MemTracker>("IOBufBlockMemory", _orphan_mem_tracker_raw); |
| } |
| |
| void ExecEnv::init_download_cache_buf() { |
| std::unique_ptr<char[]> download_cache_buf(new char[config::download_cache_buffer_size]); |
| memset(download_cache_buf.get(), 0, config::download_cache_buffer_size); |
| _download_cache_buf_map[_serial_download_cache_thread_token.get()] = |
| std::move(download_cache_buf); |
| } |
| |
| void ExecEnv::init_download_cache_required_components() { |
| ThreadPoolBuilder("DownloadCacheThreadPool") |
| .set_min_threads(1) |
| .set_max_threads(config::download_cache_thread_pool_thread_num) |
| .set_max_queue_size(config::download_cache_thread_pool_queue_size) |
| .build(&_download_cache_thread_pool); |
| set_serial_download_cache_thread_token(); |
| init_download_cache_buf(); |
| } |
| |
| void ExecEnv::_register_metrics() { |
| REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num, |
| [this]() { return _send_batch_thread_pool->num_threads(); }); |
| |
| REGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size, |
| [this]() { return _send_batch_thread_pool->get_queue_size(); }); |
| |
| REGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num, |
| [this]() { return _download_cache_thread_pool->num_threads(); }); |
| |
| REGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size, |
| [this]() { return _download_cache_thread_pool->get_queue_size(); }); |
| } |
| |
| void ExecEnv::_deregister_metrics() { |
| DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size); |
| DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num); |
| DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size); |
| DEREGISTER_HOOK_METRIC(download_cache_thread_pool_thread_num); |
| DEREGISTER_HOOK_METRIC(download_cache_thread_pool_queue_size); |
| } |
| |
| void ExecEnv::_destroy() { |
| //Only destroy once after init |
| if (!_is_init) { |
| return; |
| } |
| |
| _deregister_metrics(); |
| SAFE_DELETE(_internal_client_cache); |
| SAFE_DELETE(_function_client_cache); |
| SAFE_DELETE(_load_channel_mgr); |
| SAFE_DELETE(_broker_mgr); |
| SAFE_DELETE(_bfd_parser); |
| SAFE_DELETE(_load_path_mgr); |
| SAFE_DELETE(_pipeline_task_scheduler); |
| SAFE_DELETE(_pipeline_task_group_scheduler); |
| SAFE_DELETE(_task_group_manager); |
| SAFE_DELETE(_fragment_mgr); |
| SAFE_DELETE(_broker_client_cache); |
| SAFE_DELETE(_frontend_client_cache); |
| SAFE_DELETE(_backend_client_cache); |
| SAFE_DELETE(_result_mgr); |
| SAFE_DELETE(_result_queue_mgr); |
| SAFE_DELETE(_routine_load_task_executor); |
| SAFE_DELETE(_external_scan_context_mgr); |
| SAFE_DELETE(_heartbeat_flags); |
| SAFE_DELETE(_scanner_scheduler); |
| SAFE_DELETE(_file_meta_cache); |
| // Master Info is a thrift object, it could be the last one to deconstruct. |
| // Master info should be deconstruct later than fragment manager, because fragment will |
| // access master_info.backend id to access some info. If there is a running query and master |
| // info is deconstructed then BE process will core at coordinator back method in fragment mgr. |
| SAFE_DELETE(_master_info); |
| |
| _new_load_stream_mgr.reset(); |
| _send_batch_thread_pool.reset(nullptr); |
| _buffered_reader_prefetch_thread_pool.reset(nullptr); |
| _send_report_thread_pool.reset(nullptr); |
| _join_node_thread_pool.reset(nullptr); |
| _serial_download_cache_thread_token.reset(nullptr); |
| _download_cache_thread_pool.reset(nullptr); |
| _orphan_mem_tracker.reset(); |
| _experimental_mem_tracker.reset(); |
| _page_no_cache_mem_tracker.reset(); |
| _brpc_iobuf_block_memory_tracker.reset(); |
| InvertedIndexSearcherCache::reset_global_instance(); |
| |
| SAFE_DELETE(_runtime_query_statistics_mgr); |
| |
| // dns cache is a global instance and need to be released at last |
| SAFE_DELETE(_dns_cache); |
| _is_init = false; |
| } |
| |
| void ExecEnv::destroy(ExecEnv* env) { |
| env->_destroy(); |
| } |
| |
| } // namespace doris |