| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <glog/logging.h> |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <mutex> |
| #include <ostream> |
| #include <shared_mutex> |
| #include <string> |
| #include <unordered_map> |
| #include <unordered_set> |
| #include <vector> |
| |
| #include "common/status.h" |
| #include "util/bitmap.h" |
| #include "util/runtime_profile.h" |
| #include "util/uid_util.h" |
| #include "vec/common/custom_allocator.h" |
| |
| namespace google::protobuf { |
| template <typename Element> |
| class RepeatedField; |
| template <typename Key, typename T> |
| class Map; |
| template <typename T> |
| class RepeatedPtrField; |
| } // namespace google::protobuf |
| |
| namespace doris { |
| class PSlaveTabletNodes; |
| class PSuccessSlaveTabletNodeIds; |
| class PTabletError; |
| class PTabletInfo; |
| class PTabletWriterOpenRequest; |
| class PTabletWriterOpenResult; |
| class PTabletWriterAddBlockRequest; |
| class PTabletWriterAddBlockResult; |
| class PUniqueId; |
| class TupleDescriptor; |
| class OpenPartitionRequest; |
| class StorageEngine; |
| |
| struct TabletsChannelKey { |
| UniqueId id; |
| int64_t index_id; |
| |
| TabletsChannelKey(const PUniqueId& pid, int64_t index_id_) : id(pid), index_id(index_id_) {} |
| |
| ~TabletsChannelKey() noexcept = default; |
| |
| bool operator==(const TabletsChannelKey& rhs) const noexcept { |
| return index_id == rhs.index_id && id == rhs.id; |
| } |
| |
| std::string to_string() const; |
| }; |
| |
| std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key); |
| |
| class BaseDeltaWriter; |
| class MemTableWriter; |
| class OlapTableSchemaParam; |
| class LoadChannel; |
| struct WriteRequest; |
| |
| // Write channel for a particular (load, index). |
| class BaseTabletsChannel { |
| public: |
| BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id, bool is_high_priority, |
| RuntimeProfile* profile); |
| |
| virtual ~BaseTabletsChannel(); |
| |
| Status open(const PTabletWriterOpenRequest& request); |
| // open + open writers |
| Status incremental_open(const PTabletWriterOpenRequest& params); |
| |
| virtual std::unique_ptr<BaseDeltaWriter> create_delta_writer(const WriteRequest& request) = 0; |
| |
| // no-op when this channel has been closed or cancelled |
| virtual Status add_batch(const PTabletWriterAddBlockRequest& request, |
| PTabletWriterAddBlockResult* response) = 0; |
| |
| // Mark sender with 'sender_id' as closed. |
| // If all senders are closed, close this channel, set '*finished' to true, update 'tablet_vec' |
| // to include all tablets written in this channel. |
| // no-op when this channel has been closed or cancelled |
| virtual Status close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req, |
| PTabletWriterAddBlockResult* res, bool* finished) = 0; |
| |
| // no-op when this channel has been closed or cancelled |
| virtual Status cancel(); |
| |
| void refresh_profile(); |
| |
| size_t total_received_rows() const { return _total_received_rows; } |
| |
| size_t num_rows_filtered() const { return _num_rows_filtered; } |
| |
| // means this tablets in this BE is incremental opened partitions. |
| bool is_incremental_channel() const { return _open_by_incremental; } |
| |
| bool is_finished() const { return _state == kFinished; } |
| |
| protected: |
| Status _write_block_data(const PTabletWriterAddBlockRequest& request, int64_t cur_seq, |
| std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs, |
| PTabletWriterAddBlockResult* response); |
| |
| Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request); |
| |
| // open all writer |
| Status _open_all_writers(const PTabletWriterOpenRequest& request); |
| |
| void _add_broken_tablet(int64_t tablet_id); |
| // thread-unsafe, add a shared lock for `_tablet_writers_lock` if needed |
| bool _is_broken_tablet(int64_t tablet_id) const; |
| void _add_error_tablet(google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, |
| int64_t tablet_id, Status error) const; |
| void _build_tablet_to_rowidxs( |
| const PTabletWriterAddBlockRequest& request, |
| std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */>* |
| tablet_to_rowidxs); |
| virtual void _init_profile(RuntimeProfile* profile); |
| |
| // id of this load channel |
| TabletsChannelKey _key; |
| |
| // protect _state change. open and close. when add_batch finished, lock to change _next_seqs also |
| std::mutex _lock; |
| enum State { |
| kInitialized, |
| kOpened, |
| kFinished // closed or cancelled |
| }; |
| State _state; |
| |
| UniqueId _load_id; |
| |
| // initialized in open function |
| int64_t _txn_id = -1; |
| int64_t _index_id = -1; |
| std::shared_ptr<OlapTableSchemaParam> _schema; |
| TupleDescriptor* _tuple_desc = nullptr; |
| bool _open_by_incremental = false; |
| |
| // next sequence we expect |
| std::set<int32_t> _recieved_senders; |
| int _num_remaining_senders = 0; |
| std::vector<int64_t> _next_seqs; |
| Bitmap _closed_senders; |
| // status to return when operate on an already closed/cancelled channel |
| // currently it's OK. |
| Status _close_status; |
| |
| // tablet_id -> TabletChannel. it will only be changed in open() or inc_open() |
| std::unordered_map<int64_t, std::unique_ptr<BaseDeltaWriter>> _tablet_writers; |
| // protect _tablet_writers |
| std::mutex _tablet_writers_lock; |
| // broken tablet ids. |
| // If a tablet write fails, it's id will be added to this set. |
| // So that following batch will not handle this tablet anymore. |
| std::unordered_set<int64_t> _broken_tablets; |
| |
| std::shared_mutex _broken_tablets_lock; |
| |
| std::unordered_set<int64_t> _reducing_tablets; |
| |
| std::unordered_set<int64_t> _partition_ids; |
| |
| static std::atomic<uint64_t> _s_tablet_writer_count; |
| |
| bool _is_high_priority = false; |
| |
| RuntimeProfile* _profile = nullptr; |
| RuntimeProfile::Counter* _add_batch_number_counter = nullptr; |
| RuntimeProfile::HighWaterMarkCounter* _memory_usage_counter = nullptr; |
| RuntimeProfile::HighWaterMarkCounter* _write_memory_usage_counter = nullptr; |
| RuntimeProfile::HighWaterMarkCounter* _flush_memory_usage_counter = nullptr; |
| RuntimeProfile::HighWaterMarkCounter* _max_tablet_memory_usage_counter = nullptr; |
| RuntimeProfile::HighWaterMarkCounter* _max_tablet_write_memory_usage_counter = nullptr; |
| RuntimeProfile::HighWaterMarkCounter* _max_tablet_flush_memory_usage_counter = nullptr; |
| RuntimeProfile::Counter* _add_batch_timer = nullptr; |
| RuntimeProfile::Counter* _write_block_timer = nullptr; |
| RuntimeProfile::Counter* _incremental_open_timer = nullptr; |
| |
| // record rows received and filtered |
| size_t _total_received_rows = 0; |
| size_t _num_rows_filtered = 0; |
| }; |
| |
| class DeltaWriter; |
| |
| // `StorageEngine` mixin for `BaseTabletsChannel` |
| class TabletsChannel final : public BaseTabletsChannel { |
| public: |
| TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key, const UniqueId& load_id, |
| bool is_high_priority, RuntimeProfile* profile); |
| |
| ~TabletsChannel() override; |
| |
| std::unique_ptr<BaseDeltaWriter> create_delta_writer(const WriteRequest& request) override; |
| |
| Status add_batch(const PTabletWriterAddBlockRequest& request, |
| PTabletWriterAddBlockResult* response) override; |
| |
| Status close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req, |
| PTabletWriterAddBlockResult* res, bool* finished) override; |
| |
| Status cancel() override; |
| |
| private: |
| void _init_profile(RuntimeProfile* profile) override; |
| |
| // deal with DeltaWriter commit_txn(), add tablet to list for return. |
| void _commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req, |
| PTabletWriterAddBlockResult* res); |
| |
| StorageEngine& _engine; |
| bool _write_single_replica = false; |
| RuntimeProfile::Counter* _slave_replica_timer = nullptr; |
| }; |
| |
| } // namespace doris |