| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/load_channel_mgr.h" |
| |
| #include <fmt/format.h> |
| #include <gen_cpp/internal_service.pb.h> |
| |
| #include <algorithm> |
| // IWYU pragma: no_include <bits/chrono.h> |
| #include <chrono> // IWYU pragma: keep |
| #include <ctime> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| #include <vector> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/load_channel.h" |
| #include "util/doris_metrics.h" |
| #include "util/metrics.h" |
| #include "util/thread.h" |
| |
| namespace doris { |
| |
| #ifndef BE_TEST |
| constexpr uint32_t START_BG_INTERVAL = 60; |
| #else |
| constexpr uint32_t START_BG_INTERVAL = 1; |
| #endif |
| |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count, MetricUnit::NOUNIT); |
| DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_mem_consumption, MetricUnit::BYTES, "", |
| mem_consumption, Labels({{"type", "load"}})); |
| |
| static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) { |
| int64_t load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec; |
| if (timeout_in_req_s > 0) { |
| load_channel_timeout_s = std::max<int64_t>(load_channel_timeout_s, timeout_in_req_s); |
| } |
| return load_channel_timeout_s; |
| } |
| |
| LoadChannelMgr::LoadChannelMgr() : _stop_background_threads_latch(1) { |
| REGISTER_HOOK_METRIC(load_channel_count, [this]() { |
| // std::lock_guard<std::mutex> l(_lock); |
| return _load_channels.size(); |
| }); |
| } |
| |
| void LoadChannelMgr::stop() { |
| DEREGISTER_HOOK_METRIC(load_channel_count); |
| DEREGISTER_HOOK_METRIC(load_channel_mem_consumption); |
| _stop_background_threads_latch.count_down(); |
| if (_load_channels_clean_thread) { |
| _load_channels_clean_thread->join(); |
| } |
| } |
| |
| Status LoadChannelMgr::init(int64_t process_mem_limit) { |
| _last_success_channels = std::make_unique<LastSuccessChannelCache>(1024); |
| RETURN_IF_ERROR(_start_bg_worker()); |
| return Status::OK(); |
| } |
| |
| Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { |
| UniqueId load_id(params.id()); |
| std::shared_ptr<LoadChannel> channel; |
| { |
| std::lock_guard<std::mutex> l(_lock); |
| auto it = _load_channels.find(load_id); |
| if (it != _load_channels.end()) { |
| channel = it->second; |
| } else { |
| // create a new load channel |
| int64_t timeout_in_req_s = |
| params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1; |
| int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s); |
| bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority()); |
| |
| int64_t wg_id = -1; |
| if (params.has_workload_group_id()) { |
| wg_id = params.workload_group_id(); |
| } |
| channel.reset(new LoadChannel(load_id, channel_timeout_s, is_high_priority, |
| params.sender_ip(), params.backend_id(), |
| params.enable_profile(), wg_id)); |
| _load_channels.insert({load_id, channel}); |
| } |
| } |
| |
| RETURN_IF_ERROR(channel->open(params)); |
| |
| return Status::OK(); |
| } |
| |
| Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof, |
| const UniqueId& load_id, |
| const PTabletWriterAddBlockRequest& request) { |
| is_eof = false; |
| std::lock_guard<std::mutex> l(_lock); |
| auto it = _load_channels.find(load_id); |
| if (it == _load_channels.end()) { |
| auto* handle = _last_success_channels->lookup(load_id.to_string()); |
| // success only when eos be true |
| if (handle != nullptr) { |
| _last_success_channels->release(handle); |
| if (request.has_eos() && request.eos()) { |
| is_eof = true; |
| return Status::OK(); |
| } |
| } |
| return Status::InternalError<false>( |
| "Fail to add batch in load channel: unknown load_id={}. " |
| "This may be due to a BE restart. Please retry the load.", |
| load_id.to_string()); |
| } |
| channel = it->second; |
| return Status::OK(); |
| } |
| |
| Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request, |
| PTabletWriterAddBlockResult* response) { |
| UniqueId load_id(request.id()); |
| // 1. get load channel |
| std::shared_ptr<LoadChannel> channel; |
| bool is_eof; |
| auto status = _get_load_channel(channel, is_eof, load_id, request); |
| if (!status.ok() || is_eof) { |
| return status; |
| } |
| SCOPED_TIMER(channel->get_mgr_add_batch_timer()); |
| |
| if (!channel->is_high_priority()) { |
| // 2. check if mem consumption exceed limit |
| // If this is a high priority load task, do not handle this. |
| // because this may block for a while, which may lead to rpc timeout. |
| SCOPED_TIMER(channel->get_handle_mem_limit_timer()); |
| ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush( |
| [channel]() { return channel->is_cancelled(); }); |
| if (channel->is_cancelled()) { |
| return Status::Cancelled("LoadChannel has been cancelled: {}.", load_id.to_string()); |
| } |
| } |
| |
| // 3. add batch to load channel |
| // batch may not exist in request(eg: eos request without batch), |
| // this case will be handled in load channel's add batch method. |
| Status st = channel->add_batch(request, response); |
| if (UNLIKELY(!st.ok())) { |
| RETURN_IF_ERROR(channel->cancel()); |
| return st; |
| } |
| |
| // 4. handle finish |
| if (channel->is_finished()) { |
| _finish_load_channel(load_id); |
| } |
| return Status::OK(); |
| } |
| |
| void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { |
| VLOG_NOTICE << "removing load channel " << load_id << " because it's finished"; |
| { |
| std::lock_guard<std::mutex> l(_lock); |
| if (_load_channels.find(load_id) != _load_channels.end()) { |
| _load_channels.erase(load_id); |
| } |
| auto* handle = _last_success_channels->insert(load_id.to_string(), nullptr, 1, 1); |
| _last_success_channels->release(handle); |
| } |
| VLOG_CRITICAL << "removed load channel " << load_id; |
| } |
| |
| Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { |
| UniqueId load_id(params.id()); |
| std::shared_ptr<LoadChannel> cancelled_channel; |
| { |
| std::lock_guard<std::mutex> l(_lock); |
| if (_load_channels.find(load_id) != _load_channels.end()) { |
| cancelled_channel = _load_channels[load_id]; |
| _load_channels.erase(load_id); |
| } |
| } |
| |
| if (cancelled_channel != nullptr) { |
| RETURN_IF_ERROR(cancelled_channel->cancel()); |
| LOG(INFO) << "load channel has been cancelled: " << load_id; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status LoadChannelMgr::_start_bg_worker() { |
| RETURN_IF_ERROR(Thread::create( |
| "LoadChannelMgr", "cancel_timeout_load_channels", |
| [this]() { |
| while (!_stop_background_threads_latch.wait_for( |
| std::chrono::seconds(START_BG_INTERVAL))) { |
| static_cast<void>(_start_load_channels_clean()); |
| } |
| }, |
| &_load_channels_clean_thread)); |
| |
| return Status::OK(); |
| } |
| |
| Status LoadChannelMgr::_start_load_channels_clean() { |
| std::vector<std::shared_ptr<LoadChannel>> need_delete_channels; |
| LOG(INFO) << "cleaning timed out load channels"; |
| time_t now = time(nullptr); |
| { |
| std::vector<UniqueId> need_delete_channel_ids; |
| std::lock_guard<std::mutex> l(_lock); |
| int i = 0; |
| for (auto& kv : _load_channels) { |
| VLOG_CRITICAL << "load channel[" << i++ << "]: " << *(kv.second); |
| time_t last_updated_time = kv.second->last_updated_time(); |
| if (difftime(now, last_updated_time) >= kv.second->timeout()) { |
| need_delete_channel_ids.emplace_back(kv.first); |
| need_delete_channels.emplace_back(kv.second); |
| } |
| } |
| |
| for (auto& key : need_delete_channel_ids) { |
| _load_channels.erase(key); |
| LOG(INFO) << "erase timeout load channel: " << key; |
| } |
| } |
| |
| // we must cancel these load channels before destroying them. |
| // otherwise some object may be invalid before trying to visit it. |
| // eg: MemTracker in load channel |
| for (auto& channel : need_delete_channels) { |
| RETURN_IF_ERROR(channel->cancel()); |
| LOG(INFO) << "load channel has been safely deleted: " << channel->load_id() |
| << ", timeout(s): " << channel->timeout(); |
| } |
| |
| return Status::OK(); |
| } |
| } // namespace doris |