| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <gen_cpp/internal_service.pb.h> |
| #include <stdint.h> |
| #include <time.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <functional> |
| #include <map> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <string> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/status.h" |
| #include "runtime/memory/mem_tracker.h" |
| #include "runtime/tablets_channel.h" |
| #include "util/runtime_profile.h" |
| #include "util/spinlock.h" |
| #include "util/thrift_util.h" |
| #include "util/uid_util.h" |
| //#include <gen_cpp/internal_service.pb.h> |
| |
| namespace doris { |
| |
| class PTabletWriterOpenRequest; |
| class OpenPartitionRequest; |
| |
| // A LoadChannel manages tablets channels for all indexes |
| // corresponding to a certain load job |
| class LoadChannel { |
| public: |
| LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> mem_tracker, int64_t timeout_s, |
| bool is_high_priority, const std::string& sender_ip, int64_t backend_id, |
| bool enable_profile); |
| ~LoadChannel(); |
| |
| // open a new load channel if not exist |
| Status open(const PTabletWriterOpenRequest& request); |
| |
| // this batch must belong to a index in one transaction |
| Status add_batch(const PTabletWriterAddBlockRequest& request, |
| PTabletWriterAddBlockResult* response); |
| |
| // return true if this load channel has been opened and all tablets channels are closed then. |
| bool is_finished(); |
| |
| Status cancel(); |
| |
| time_t last_updated_time() const { return _last_updated_time.load(); } |
| |
| const UniqueId& load_id() const { return _load_id; } |
| |
| int64_t mem_consumption() { |
| int64_t mem_usage = 0; |
| { |
| std::lock_guard<SpinLock> l(_tablets_channels_lock); |
| for (auto& it : _tablets_channels) { |
| mem_usage += it.second->mem_consumption(); |
| } |
| } |
| _mem_tracker->set_consumption(mem_usage); |
| return mem_usage; |
| } |
| |
| void get_active_memtable_mem_consumption( |
| std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>* |
| writers_mem_snap) { |
| std::lock_guard<SpinLock> l(_tablets_channels_lock); |
| for (auto& it : _tablets_channels) { |
| std::multimap<int64_t, int64_t, std::greater<int64_t>> tablets_channel_mem; |
| it.second->get_active_memtable_mem_consumption(&tablets_channel_mem); |
| writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem)); |
| } |
| } |
| |
| int64_t timeout() const { return _timeout_s; } |
| |
| bool is_high_priority() const { return _is_high_priority; } |
| |
| void flush_memtable_async(int64_t index_id, int64_t tablet_id) { |
| std::lock_guard<std::mutex> l(_lock); |
| auto it = _tablets_channels.find(index_id); |
| if (it != _tablets_channels.end()) { |
| it->second->flush_memtable_async(tablet_id); |
| } |
| } |
| |
| void wait_flush(int64_t index_id, int64_t tablet_id) { |
| std::lock_guard<std::mutex> l(_lock); |
| auto it = _tablets_channels.find(index_id); |
| if (it != _tablets_channels.end()) { |
| it->second->wait_flush(tablet_id); |
| } |
| } |
| |
| RuntimeProfile::Counter* get_mgr_add_batch_timer() { return _mgr_add_batch_timer; } |
| RuntimeProfile::Counter* get_handle_mem_limit_timer() { return _handle_mem_limit_timer; } |
| |
| protected: |
| Status _get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, bool& is_finished, |
| const int64_t index_id); |
| |
| Status _handle_eos(std::shared_ptr<TabletsChannel>& channel, |
| const PTabletWriterAddBlockRequest& request, |
| PTabletWriterAddBlockResult* response) { |
| _self_profile->add_info_string("EosHost", fmt::format("{}", request.backend_id())); |
| bool finished = false; |
| auto index_id = request.index_id(); |
| RETURN_IF_ERROR(channel->close( |
| this, request.sender_id(), request.backend_id(), &finished, request.partition_ids(), |
| response->mutable_tablet_vec(), response->mutable_tablet_errors(), |
| request.slave_tablet_nodes(), response->mutable_success_slave_tablet_node_ids(), |
| request.write_single_replica())); |
| if (finished) { |
| std::lock_guard<std::mutex> l(_lock); |
| { |
| std::lock_guard<SpinLock> l(_tablets_channels_lock); |
| _tablets_channels.erase(index_id); |
| } |
| _finished_channel_ids.emplace(index_id); |
| } |
| return Status::OK(); |
| } |
| |
| void _init_profile(); |
| // thread safety |
| void _report_profile(PTabletWriterAddBlockResult* response); |
| |
| private: |
| UniqueId _load_id; |
| // Tracks the total memory consumed by current load job on this BE |
| std::unique_ptr<MemTracker> _mem_tracker; |
| |
| SpinLock _profile_serialize_lock; |
| std::unique_ptr<RuntimeProfile> _profile; |
| RuntimeProfile* _self_profile; |
| RuntimeProfile::Counter* _add_batch_number_counter = nullptr; |
| RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; |
| RuntimeProfile::Counter* _add_batch_timer = nullptr; |
| RuntimeProfile::Counter* _add_batch_times = nullptr; |
| RuntimeProfile::Counter* _mgr_add_batch_timer = nullptr; |
| RuntimeProfile::Counter* _handle_mem_limit_timer = nullptr; |
| RuntimeProfile::Counter* _handle_eos_timer = nullptr; |
| |
| // lock protect the tablets channel map |
| std::mutex _lock; |
| // index id -> tablets channel |
| std::unordered_map<int64_t, std::shared_ptr<TabletsChannel>> _tablets_channels; |
| SpinLock _tablets_channels_lock; |
| // This is to save finished channels id, to handle the retry request. |
| std::unordered_set<int64_t> _finished_channel_ids; |
| // set to true if at least one tablets channel has been opened |
| bool _opened = false; |
| |
| std::atomic<time_t> _last_updated_time; |
| |
| // the timeout of this load job. |
| // Timed out channels will be periodically deleted by LoadChannelMgr. |
| int64_t _timeout_s; |
| |
| // true if this is a high priority load task |
| bool _is_high_priority = false; |
| |
| // the ip where tablet sink locate |
| std::string _sender_ip; |
| |
| int64_t _backend_id; |
| |
| bool _enable_profile; |
| }; |
| |
| inline std::ostream& operator<<(std::ostream& os, LoadChannel& load_channel) { |
| os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption() |
| << ", last_update_time=" << static_cast<uint64_t>(load_channel.last_updated_time()) |
| << ", is high priority: " << load_channel.is_high_priority() << ")"; |
| return os; |
| } |
| |
| } // namespace doris |