| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <brpc/controller.h> |
| #include <bthread/bthread.h> |
| #include <gen_cpp/cloud.pb.h> |
| #include <google/protobuf/message.h> |
| #include <google/protobuf/service.h> |
| |
| #include <chrono> |
| #include <random> |
| #include <type_traits> |
| |
| #include "common/config.h" |
| #include "common/stats.h" |
| #include "cpp/sync_point.h" |
| #include "meta-service/delete_bitmap_lock_white_list.h" |
| #include "meta-service/txn_lazy_committer.h" |
| #include "meta-store/txn_kv.h" |
| #include "rate-limiter/rate_limiter.h" |
| #include "resource-manager/resource_manager.h" |
| #include "snapshot/snapshot_manager.h" |
| |
| namespace doris::cloud { |
| |
| class Transaction; |
| |
| constexpr std::string_view BUILT_IN_STORAGE_VAULT_NAME = "built_in_storage_vault"; |
| static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1; |
| static constexpr int SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID = -2; |
| static constexpr int COMPACTION_WITHOUT_LOCK_DELETE_BITMAP_LOCK_ID = -3; |
| |
| void internal_get_rowset(Transaction* txn, int64_t start, int64_t end, |
| const std::string& instance_id, int64_t tablet_id, MetaServiceCode& code, |
| std::string& msg, GetRowsetResponse* response); |
| |
| // for wrapping stateful lambda to run in bthread |
| static void* run_bthread_work(void* arg) { |
| auto f = reinterpret_cast<std::function<void()>*>(arg); |
| (*f)(); |
| delete f; |
| return nullptr; |
| } |
| |
| [[maybe_unused]] inline static bool is_job_delete_bitmap_lock_id(int64_t lock_id) { |
| return lock_id == COMPACTION_DELETE_BITMAP_LOCK_ID || |
| lock_id == SCHEMA_CHANGE_DELETE_BITMAP_LOCK_ID; |
| } |
| |
| [[maybe_unused]] void record_txn_commit_stats(doris::cloud::Transaction* txn, |
| const std::string& instance_id, |
| int64_t partition_count, int64_t tablet_count, |
| int64_t txn_id); |
| |
| class MetaServiceImpl : public cloud::MetaService { |
| public: |
| MetaServiceImpl(std::shared_ptr<TxnKv> txn_kv, std::shared_ptr<ResourceManager> resource_mgr, |
| std::shared_ptr<RateLimiter> rate_controller, |
| std::shared_ptr<SnapshotManager> snapshot_manager); |
| ~MetaServiceImpl() override; |
| |
| [[nodiscard]] const std::shared_ptr<TxnKv>& txn_kv() const { return txn_kv_; } |
| [[nodiscard]] const std::shared_ptr<RateLimiter>& rate_limiter() const { return rate_limiter_; } |
| [[nodiscard]] const std::shared_ptr<ResourceManager>& resource_mgr() const { |
| return resource_mgr_; |
| } |
| |
| [[nodiscard]] const std::shared_ptr<TxnLazyCommitter>& txn_lazy_committer() const { |
| return txn_lazy_committer_; |
| } |
| |
| [[nodiscard]] const std::shared_ptr<SnapshotManager>& snapshot_manager() const { |
| return snapshot_manager_; |
| } |
| |
| void begin_txn(::google::protobuf::RpcController* controller, const BeginTxnRequest* request, |
| BeginTxnResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void precommit_txn(::google::protobuf::RpcController* controller, |
| const PrecommitTxnRequest* request, PrecommitTxnResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void commit_txn(::google::protobuf::RpcController* controller, const CommitTxnRequest* request, |
| CommitTxnResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void abort_txn(::google::protobuf::RpcController* controller, const AbortTxnRequest* request, |
| AbortTxnResponse* response, ::google::protobuf::Closure* done) override; |
| |
| // clang-format off |
| void get_txn(::google::protobuf::RpcController* controller, |
| const GetTxnRequest* request, |
| GetTxnResponse* response, |
| ::google::protobuf::Closure* done) override; |
| // clang-format on |
| |
| void get_current_max_txn_id(::google::protobuf::RpcController* controller, |
| const GetCurrentMaxTxnRequest* request, |
| GetCurrentMaxTxnResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void begin_sub_txn(::google::protobuf::RpcController* controller, |
| const BeginSubTxnRequest* request, BeginSubTxnResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void abort_sub_txn(::google::protobuf::RpcController* controller, |
| const AbortSubTxnRequest* request, AbortSubTxnResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void check_txn_conflict(::google::protobuf::RpcController* controller, |
| const CheckTxnConflictRequest* request, |
| CheckTxnConflictResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void abort_txn_with_coordinator(::google::protobuf::RpcController* controller, |
| const AbortTxnWithCoordinatorRequest* request, |
| AbortTxnWithCoordinatorResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void clean_txn_label(::google::protobuf::RpcController* controller, |
| const CleanTxnLabelRequest* request, CleanTxnLabelResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_version(::google::protobuf::RpcController* controller, |
| const GetVersionRequest* request, GetVersionResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void batch_get_version(::google::protobuf::RpcController* controller, |
| const GetVersionRequest* request, GetVersionResponse* response, |
| ::google::protobuf::Closure* done); |
| |
| void create_tablets(::google::protobuf::RpcController* controller, |
| const CreateTabletsRequest* request, CreateTabletsResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void update_tablet(::google::protobuf::RpcController* controller, |
| const UpdateTabletRequest* request, UpdateTabletResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_tablet(::google::protobuf::RpcController* controller, const GetTabletRequest* request, |
| GetTabletResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void prepare_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void commit_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void update_tmp_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_rowset(::google::protobuf::RpcController* controller, const GetRowsetRequest* request, |
| GetRowsetResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void prepare_index(::google::protobuf::RpcController* controller, const IndexRequest* request, |
| IndexResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void commit_index(::google::protobuf::RpcController* controller, const IndexRequest* request, |
| IndexResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void drop_index(::google::protobuf::RpcController* controller, const IndexRequest* request, |
| IndexResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request, |
| CheckKVResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void prepare_partition(::google::protobuf::RpcController* controller, |
| const PartitionRequest* request, PartitionResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void commit_partition(::google::protobuf::RpcController* controller, |
| const PartitionRequest* request, PartitionResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void drop_partition(::google::protobuf::RpcController* controller, |
| const PartitionRequest* request, PartitionResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void prepare_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void commit_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void finish_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_tablet_stats(::google::protobuf::RpcController* controller, |
| const GetTabletStatsRequest* request, GetTabletStatsResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void start_tablet_job(::google::protobuf::RpcController* controller, |
| const StartTabletJobRequest* request, StartTabletJobResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void finish_tablet_job(::google::protobuf::RpcController* controller, |
| const FinishTabletJobRequest* request, FinishTabletJobResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void http(::google::protobuf::RpcController* controller, const MetaServiceHttpRequest* request, |
| MetaServiceHttpResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void get_obj_store_info(google::protobuf::RpcController* controller, |
| const GetObjStoreInfoRequest* request, |
| GetObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void alter_obj_store_info(google::protobuf::RpcController* controller, |
| const AlterObjStoreInfoRequest* request, |
| AlterObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void alter_storage_vault(google::protobuf::RpcController* controller, |
| const AlterObjStoreInfoRequest* request, |
| AlterObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void update_ak_sk(google::protobuf::RpcController* controller, const UpdateAkSkRequest* request, |
| UpdateAkSkResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void create_instance(google::protobuf::RpcController* controller, |
| const CreateInstanceRequest* request, CreateInstanceResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void alter_instance(google::protobuf::RpcController* controller, |
| const AlterInstanceRequest* request, AlterInstanceResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_instance(google::protobuf::RpcController* controller, |
| const GetInstanceRequest* request, GetInstanceResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void alter_cluster(google::protobuf::RpcController* controller, |
| const AlterClusterRequest* request, AlterClusterResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_cluster(google::protobuf::RpcController* controller, const GetClusterRequest* request, |
| GetClusterResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void create_stage(google::protobuf::RpcController* controller, |
| const CreateStageRequest* request, CreateStageResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_stage(google::protobuf::RpcController* controller, const GetStageRequest* request, |
| GetStageResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void drop_stage(google::protobuf::RpcController* controller, const DropStageRequest* request, |
| DropStageResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void get_iam(google::protobuf::RpcController* controller, const GetIamRequest* request, |
| GetIamResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void alter_iam(google::protobuf::RpcController* controller, const AlterIamRequest* request, |
| AlterIamResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void alter_ram_user(google::protobuf::RpcController* controller, |
| const AlterRamUserRequest* request, AlterRamUserResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void begin_copy(google::protobuf::RpcController* controller, const BeginCopyRequest* request, |
| BeginCopyResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void finish_copy(google::protobuf::RpcController* controller, const FinishCopyRequest* request, |
| FinishCopyResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void get_copy_job(google::protobuf::RpcController* controller, const GetCopyJobRequest* request, |
| GetCopyJobResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void get_copy_files(google::protobuf::RpcController* controller, |
| const GetCopyFilesRequest* request, GetCopyFilesResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| // filter files that are loading or loaded in the input files, return files that are not loaded |
| void filter_copy_files(google::protobuf::RpcController* controller, |
| const FilterCopyFilesRequest* request, FilterCopyFilesResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void update_delete_bitmap(google::protobuf::RpcController* controller, |
| const UpdateDeleteBitmapRequest* request, |
| UpdateDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) override; |
| void get_delete_bitmap(google::protobuf::RpcController* controller, |
| const GetDeleteBitmapRequest* request, GetDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_delete_bitmap_update_lock(google::protobuf::RpcController* controller, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void remove_delete_bitmap(google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapRequest* request, |
| RemoveDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void remove_delete_bitmap_update_lock(google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapUpdateLockRequest* request, |
| RemoveDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| // cloud control get cluster's status by this api |
| void get_cluster_status(google::protobuf::RpcController* controller, |
| const GetClusterStatusRequest* request, |
| GetClusterStatusResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_rl_task_commit_attach(::google::protobuf::RpcController* controller, |
| const GetRLTaskCommitAttachRequest* request, |
| GetRLTaskCommitAttachResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_streaming_task_commit_attach(::google::protobuf::RpcController* controller, |
| const GetStreamingTaskCommitAttachRequest* request, |
| GetStreamingTaskCommitAttachResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void delete_streaming_job(::google::protobuf::RpcController* controller, |
| const DeleteStreamingJobRequest* request, |
| DeleteStreamingJobResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void reset_rl_progress(::google::protobuf::RpcController* controller, |
| const ResetRLProgressRequest* request, ResetRLProgressResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request, |
| GetTxnIdResponse* response, ::google::protobuf::Closure* done) override; |
| |
| void get_schema_dict(::google::protobuf::RpcController* controller, |
| const GetSchemaDictRequest* request, GetSchemaDictResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| // ATTN: If you add a new method, please also add the corresponding implementation in `MetaServiceProxy`. |
| |
| std::pair<MetaServiceCode, std::string> get_instance_info(const std::string& instance_id, |
| const std::string& cloud_unique_id, |
| InstanceInfoPB* instance); |
| |
| MetaServiceResponseStatus fix_tablet_stats(std::string cloud_unique_id_str, |
| std::string table_id_str); |
| |
| void get_delete_bitmap_lock_version(std::string& use_version, std::string& instance_id); |
| |
| void begin_snapshot(::google::protobuf::RpcController* controller, |
| const BeginSnapshotRequest* request, BeginSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void update_snapshot(::google::protobuf::RpcController* controller, |
| const UpdateSnapshotRequest* request, UpdateSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void commit_snapshot(::google::protobuf::RpcController* controller, |
| const CommitSnapshotRequest* request, CommitSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void abort_snapshot(::google::protobuf::RpcController* controller, |
| const AbortSnapshotRequest* request, AbortSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void list_snapshot(::google::protobuf::RpcController* controller, |
| const ListSnapshotRequest* request, ListSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void drop_snapshot(::google::protobuf::RpcController* controller, |
| const DropSnapshotRequest* request, DropSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| void clone_instance(::google::protobuf::RpcController* controller, |
| const CloneInstanceRequest* request, CloneInstanceResponse* response, |
| ::google::protobuf::Closure* done) override; |
| |
| private: |
| std::pair<MetaServiceCode, std::string> alter_instance( |
| const AlterInstanceRequest* request, |
| std::function<std::pair<MetaServiceCode, std::string>(InstanceInfoPB*)> action); |
| |
| bool get_mow_tablet_stats_and_meta(MetaServiceCode& code, std::string& msg, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, |
| std::string& instance_id, std::string& lock_key, |
| std::string lock_use_version, KVStats& stats); |
| |
| void get_delete_bitmap_update_lock_v2(google::protobuf::RpcController* controller, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done, |
| std::string& instance_id, MetaServiceCode& code, |
| std::string& msg, std::stringstream& ss, KVStats& stats); |
| |
| void get_delete_bitmap_update_lock_v1(google::protobuf::RpcController* controller, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done, |
| std::string& instance_id, MetaServiceCode& code, |
| std::string& msg, std::stringstream& ss, KVStats& stats); |
| |
| void remove_delete_bitmap_update_lock_v2(google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapUpdateLockRequest* request, |
| RemoveDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done, |
| std::string& instance_id, MetaServiceCode& code, |
| std::string& msg, std::stringstream& ss, |
| KVStats& stats); |
| |
| void remove_delete_bitmap_update_lock_v1(google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapUpdateLockRequest* request, |
| RemoveDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done, |
| std::string& instance_id, MetaServiceCode& code, |
| std::string& msg, std::stringstream& ss, |
| KVStats& stats); |
| |
| void update_table_version(Transaction* txn, std::string_view instance_id, int64_t db_id, |
| int64_t table_id); |
| |
| bool is_version_read_enabled(std::string_view instance_id) const; |
| bool is_version_write_enabled(std::string_view instance_id) const; |
| |
| void commit_txn_immediately( |
| const CommitTxnRequest* request, CommitTxnResponse* response, MetaServiceCode& code, |
| std::string& msg, const std::string& instance_id, int64_t db_id, |
| std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta, |
| TxnErrorCode& err, KVStats& stats); |
| |
| void commit_txn_eventually( |
| const CommitTxnRequest* request, CommitTxnResponse* response, MetaServiceCode& code, |
| std::string& msg, const std::string& instance_id, int64_t db_id, |
| const std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& tmp_rowsets_meta, |
| KVStats& stats); |
| |
| void commit_txn_with_sub_txn(const CommitTxnRequest* request, CommitTxnResponse* response, |
| MetaServiceCode& code, std::string& msg, |
| const std::string& instance_id, int64_t db_id, KVStats& stats); |
| |
| // Get the first pending transaction ID for a partition. If there no any pending transaction, |
| // `first_txn_id` will be set to -1. |
| void get_partition_pending_txn_id(std::string_view instance_id, int64_t db_id, int64_t table_id, |
| int64_t partition_id, int64_t tablet_id, |
| std::stringstream& ss, MetaServiceCode& code, |
| std::string& msg, int64_t& first_txn_id, |
| int64_t& partition_version, Transaction* txn); |
| |
| // Get versions in batch, Only for versioned read. |
| std::pair<MetaServiceCode, std::string> batch_get_table_versions( |
| const GetVersionRequest* request, GetVersionResponse* response, |
| std::string_view instance_id, KVStats& stats); |
| std::pair<MetaServiceCode, std::string> batch_get_partition_versions( |
| const GetVersionRequest* request, GetVersionResponse* response, |
| std::string_view instance_id, KVStats& stats); |
| |
| std::shared_ptr<TxnKv> txn_kv_; |
| std::shared_ptr<ResourceManager> resource_mgr_; |
| std::shared_ptr<RateLimiter> rate_limiter_; |
| std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_; |
| std::shared_ptr<DeleteBitmapLockWhiteList> delete_bitmap_lock_white_list_; |
| std::shared_ptr<SnapshotManager> snapshot_manager_; |
| }; |
| |
| class MetaServiceProxy final : public MetaService { |
| public: |
| MetaServiceProxy(std::unique_ptr<MetaServiceImpl> service) : impl_(std::move(service)) {} |
| ~MetaServiceProxy() override = default; |
| MetaServiceProxy(const MetaServiceProxy&) = delete; |
| MetaServiceProxy& operator=(const MetaServiceProxy&) = delete; |
| |
| [[nodiscard]] const std::shared_ptr<TxnKv>& txn_kv() const { return impl_->txn_kv(); } |
| [[nodiscard]] const std::shared_ptr<RateLimiter>& rate_limiter() const { |
| return impl_->rate_limiter(); |
| } |
| [[nodiscard]] const std::shared_ptr<ResourceManager>& resource_mgr() const { |
| return impl_->resource_mgr(); |
| } |
| |
| void begin_txn(::google::protobuf::RpcController* controller, const BeginTxnRequest* request, |
| BeginTxnResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::begin_txn, controller, request, response, done); |
| } |
| |
| void precommit_txn(::google::protobuf::RpcController* controller, |
| const PrecommitTxnRequest* request, PrecommitTxnResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::precommit_txn, controller, request, response, done); |
| } |
| |
| void commit_txn(::google::protobuf::RpcController* controller, const CommitTxnRequest* request, |
| CommitTxnResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::commit_txn, controller, request, response, done); |
| } |
| |
| void abort_txn(::google::protobuf::RpcController* controller, const AbortTxnRequest* request, |
| AbortTxnResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::abort_txn, controller, request, response, done); |
| } |
| |
| void get_txn(::google::protobuf::RpcController* controller, const GetTxnRequest* request, |
| GetTxnResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_txn, controller, request, response, done); |
| } |
| |
| void get_current_max_txn_id(::google::protobuf::RpcController* controller, |
| const GetCurrentMaxTxnRequest* request, |
| GetCurrentMaxTxnResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_current_max_txn_id, controller, request, response, done); |
| } |
| |
| void begin_sub_txn(::google::protobuf::RpcController* controller, |
| const BeginSubTxnRequest* request, BeginSubTxnResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::begin_sub_txn, controller, request, response, done); |
| } |
| |
| void abort_sub_txn(::google::protobuf::RpcController* controller, |
| const AbortSubTxnRequest* request, AbortSubTxnResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::abort_sub_txn, controller, request, response, done); |
| } |
| |
| void check_txn_conflict(::google::protobuf::RpcController* controller, |
| const CheckTxnConflictRequest* request, |
| CheckTxnConflictResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::check_txn_conflict, controller, request, response, done); |
| } |
| |
| void abort_txn_with_coordinator(::google::protobuf::RpcController* controller, |
| const AbortTxnWithCoordinatorRequest* request, |
| AbortTxnWithCoordinatorResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::abort_txn_with_coordinator, controller, request, response, |
| done); |
| } |
| |
| void clean_txn_label(::google::protobuf::RpcController* controller, |
| const CleanTxnLabelRequest* request, CleanTxnLabelResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::clean_txn_label, controller, request, response, done); |
| } |
| |
| void get_version(::google::protobuf::RpcController* controller, |
| const GetVersionRequest* request, GetVersionResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_version, controller, request, response, done); |
| } |
| |
| void create_tablets(::google::protobuf::RpcController* controller, |
| const CreateTabletsRequest* request, CreateTabletsResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::create_tablets, controller, request, response, done); |
| } |
| |
| void update_tablet(::google::protobuf::RpcController* controller, |
| const UpdateTabletRequest* request, UpdateTabletResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::update_tablet, controller, request, response, done); |
| } |
| |
| void get_tablet(::google::protobuf::RpcController* controller, const GetTabletRequest* request, |
| GetTabletResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_tablet, controller, request, response, done); |
| } |
| |
| void prepare_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::prepare_rowset, controller, request, response, done); |
| } |
| |
| void commit_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::commit_rowset, controller, request, response, done); |
| } |
| |
| void update_tmp_rowset(::google::protobuf::RpcController* controller, |
| const CreateRowsetRequest* request, CreateRowsetResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::update_tmp_rowset, controller, request, response, done); |
| } |
| |
| void get_rowset(::google::protobuf::RpcController* controller, const GetRowsetRequest* request, |
| GetRowsetResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_rowset, controller, request, response, done); |
| } |
| |
| void prepare_index(::google::protobuf::RpcController* controller, const IndexRequest* request, |
| IndexResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::prepare_index, controller, request, response, done); |
| } |
| |
| void commit_index(::google::protobuf::RpcController* controller, const IndexRequest* request, |
| IndexResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::commit_index, controller, request, response, done); |
| } |
| |
| void drop_index(::google::protobuf::RpcController* controller, const IndexRequest* request, |
| IndexResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::drop_index, controller, request, response, done); |
| } |
| |
| void check_kv(::google::protobuf::RpcController* controller, const CheckKVRequest* request, |
| CheckKVResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::check_kv, controller, request, response, done); |
| } |
| |
| void prepare_partition(::google::protobuf::RpcController* controller, |
| const PartitionRequest* request, PartitionResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::prepare_partition, controller, request, response, done); |
| } |
| |
| void commit_partition(::google::protobuf::RpcController* controller, |
| const PartitionRequest* request, PartitionResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::commit_partition, controller, request, response, done); |
| } |
| |
| void drop_partition(::google::protobuf::RpcController* controller, |
| const PartitionRequest* request, PartitionResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::drop_partition, controller, request, response, done); |
| } |
| |
| void prepare_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::prepare_restore_job, controller, request, response, done); |
| } |
| |
| void commit_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::commit_restore_job, controller, request, response, done); |
| } |
| |
| void finish_restore_job(::google::protobuf::RpcController* controller, |
| const RestoreJobRequest* request, RestoreJobResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::finish_restore_job, controller, request, response, done); |
| } |
| |
| void get_tablet_stats(::google::protobuf::RpcController* controller, |
| const GetTabletStatsRequest* request, GetTabletStatsResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_tablet_stats, controller, request, response, done); |
| } |
| |
| void start_tablet_job(::google::protobuf::RpcController* controller, |
| const StartTabletJobRequest* request, StartTabletJobResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::start_tablet_job, controller, request, response, done); |
| } |
| |
| void finish_tablet_job(::google::protobuf::RpcController* controller, |
| const FinishTabletJobRequest* request, FinishTabletJobResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::finish_tablet_job, controller, request, response, done); |
| } |
| |
| void http(::google::protobuf::RpcController* controller, const MetaServiceHttpRequest* request, |
| MetaServiceHttpResponse* response, ::google::protobuf::Closure* done) override { |
| impl_->http(controller, request, response, done); |
| } |
| |
| void get_obj_store_info(google::protobuf::RpcController* controller, |
| const GetObjStoreInfoRequest* request, |
| GetObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_obj_store_info, controller, request, response, done); |
| } |
| |
| void alter_obj_store_info(google::protobuf::RpcController* controller, |
| const AlterObjStoreInfoRequest* request, |
| AlterObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::alter_obj_store_info, controller, request, response, done); |
| } |
| |
| void alter_storage_vault(google::protobuf::RpcController* controller, |
| const AlterObjStoreInfoRequest* request, |
| AlterObjStoreInfoResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::alter_storage_vault, controller, request, response, done); |
| } |
| |
| void update_ak_sk(google::protobuf::RpcController* controller, const UpdateAkSkRequest* request, |
| UpdateAkSkResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::update_ak_sk, controller, request, response, done); |
| } |
| |
| void create_instance(google::protobuf::RpcController* controller, |
| const CreateInstanceRequest* request, CreateInstanceResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::create_instance, controller, request, response, done); |
| } |
| |
| void get_instance(google::protobuf::RpcController* controller, |
| const GetInstanceRequest* request, GetInstanceResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_instance, controller, request, response, done); |
| } |
| |
| void alter_instance(google::protobuf::RpcController* controller, |
| const AlterInstanceRequest* request, AlterInstanceResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::alter_instance, controller, request, response, done); |
| } |
| |
| void alter_cluster(google::protobuf::RpcController* controller, |
| const AlterClusterRequest* request, AlterClusterResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::alter_cluster, controller, request, response, done); |
| } |
| |
| void get_cluster(google::protobuf::RpcController* controller, const GetClusterRequest* request, |
| GetClusterResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_cluster, controller, request, response, done); |
| } |
| |
| void create_stage(google::protobuf::RpcController* controller, |
| const CreateStageRequest* request, CreateStageResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::create_stage, controller, request, response, done); |
| } |
| |
| void get_stage(google::protobuf::RpcController* controller, const GetStageRequest* request, |
| GetStageResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_stage, controller, request, response, done); |
| } |
| |
| void drop_stage(google::protobuf::RpcController* controller, const DropStageRequest* request, |
| DropStageResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::drop_stage, controller, request, response, done); |
| } |
| |
| void get_iam(google::protobuf::RpcController* controller, const GetIamRequest* request, |
| GetIamResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_iam, controller, request, response, done); |
| } |
| |
| void alter_iam(google::protobuf::RpcController* controller, const AlterIamRequest* request, |
| AlterIamResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::alter_iam, controller, request, response, done); |
| } |
| |
| void alter_ram_user(google::protobuf::RpcController* controller, |
| const AlterRamUserRequest* request, AlterRamUserResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::alter_ram_user, controller, request, response, done); |
| } |
| |
| void begin_copy(google::protobuf::RpcController* controller, const BeginCopyRequest* request, |
| BeginCopyResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::begin_copy, controller, request, response, done); |
| } |
| |
| void finish_copy(google::protobuf::RpcController* controller, const FinishCopyRequest* request, |
| FinishCopyResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::finish_copy, controller, request, response, done); |
| } |
| |
| void get_copy_job(google::protobuf::RpcController* controller, const GetCopyJobRequest* request, |
| GetCopyJobResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_copy_job, controller, request, response, done); |
| } |
| |
| void get_copy_files(google::protobuf::RpcController* controller, |
| const GetCopyFilesRequest* request, GetCopyFilesResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_copy_files, controller, request, response, done); |
| } |
| |
| // filter files that are loading or loaded in the input files, return files that are not loaded |
| void filter_copy_files(google::protobuf::RpcController* controller, |
| const FilterCopyFilesRequest* request, FilterCopyFilesResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::filter_copy_files, controller, request, response, done); |
| } |
| |
| void update_delete_bitmap(google::protobuf::RpcController* controller, |
| const UpdateDeleteBitmapRequest* request, |
| UpdateDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::update_delete_bitmap, controller, request, response, done); |
| } |
| |
| void get_delete_bitmap(google::protobuf::RpcController* controller, |
| const GetDeleteBitmapRequest* request, GetDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_delete_bitmap, controller, request, response, done); |
| } |
| |
| void get_delete_bitmap_update_lock(google::protobuf::RpcController* controller, |
| const GetDeleteBitmapUpdateLockRequest* request, |
| GetDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_delete_bitmap_update_lock, controller, request, response, |
| done); |
| } |
| |
| void remove_delete_bitmap(google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapRequest* request, |
| RemoveDeleteBitmapResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::remove_delete_bitmap, controller, request, response, done); |
| } |
| |
| void remove_delete_bitmap_update_lock(google::protobuf::RpcController* controller, |
| const RemoveDeleteBitmapUpdateLockRequest* request, |
| RemoveDeleteBitmapUpdateLockResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::remove_delete_bitmap_update_lock, controller, request, |
| response, done); |
| } |
| |
| // cloud control get cluster's status by this api |
| void get_cluster_status(google::protobuf::RpcController* controller, |
| const GetClusterStatusRequest* request, |
| GetClusterStatusResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_cluster_status, controller, request, response, done); |
| } |
| |
| void get_rl_task_commit_attach(::google::protobuf::RpcController* controller, |
| const GetRLTaskCommitAttachRequest* request, |
| GetRLTaskCommitAttachResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_rl_task_commit_attach, controller, request, response, |
| done); |
| } |
| |
| void get_streaming_task_commit_attach(::google::protobuf::RpcController* controller, |
| const GetStreamingTaskCommitAttachRequest* request, |
| GetStreamingTaskCommitAttachResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_streaming_task_commit_attach, controller, request, |
| response, done); |
| } |
| |
| void delete_streaming_job(::google::protobuf::RpcController* controller, |
| const DeleteStreamingJobRequest* request, |
| DeleteStreamingJobResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::delete_streaming_job, controller, request, response, done); |
| } |
| |
| void reset_rl_progress(::google::protobuf::RpcController* controller, |
| const ResetRLProgressRequest* request, ResetRLProgressResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::reset_rl_progress, controller, request, response, done); |
| } |
| |
| void get_txn_id(::google::protobuf::RpcController* controller, const GetTxnIdRequest* request, |
| GetTxnIdResponse* response, ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_txn_id, controller, request, response, done); |
| } |
| |
| void get_schema_dict(::google::protobuf::RpcController* controller, |
| const GetSchemaDictRequest* request, GetSchemaDictResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::get_schema_dict, controller, request, response, done); |
| } |
| |
| void get_delete_bitmap_lock_version(std::string& use_version, std::string& instance_id) { |
| impl_->get_delete_bitmap_lock_version(use_version, instance_id); |
| } |
| |
| void begin_snapshot(::google::protobuf::RpcController* controller, |
| const BeginSnapshotRequest* request, BeginSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::begin_snapshot, controller, request, response, done); |
| } |
| |
| void update_snapshot(::google::protobuf::RpcController* controller, |
| const UpdateSnapshotRequest* request, UpdateSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::update_snapshot, controller, request, response, done); |
| } |
| |
| void commit_snapshot(::google::protobuf::RpcController* controller, |
| const CommitSnapshotRequest* request, CommitSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::commit_snapshot, controller, request, response, done); |
| } |
| |
| void abort_snapshot(::google::protobuf::RpcController* controller, |
| const AbortSnapshotRequest* request, AbortSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::abort_snapshot, controller, request, response, done); |
| } |
| |
| void list_snapshot(::google::protobuf::RpcController* controller, |
| const ListSnapshotRequest* request, ListSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::list_snapshot, controller, request, response, done); |
| } |
| |
| void drop_snapshot(::google::protobuf::RpcController* controller, |
| const DropSnapshotRequest* request, DropSnapshotResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::drop_snapshot, controller, request, response, done); |
| } |
| |
| void clone_instance(::google::protobuf::RpcController* controller, |
| const CloneInstanceRequest* request, CloneInstanceResponse* response, |
| ::google::protobuf::Closure* done) override { |
| call_impl(&cloud::MetaService::clone_instance, controller, request, response, done); |
| } |
| |
| private: |
| template <typename Request, typename Response> |
| using MetaServiceMethod = void (cloud::MetaService::*)(::google::protobuf::RpcController*, |
| const Request*, Response*, |
| ::google::protobuf::Closure*); |
| |
| template <typename Request, typename Response> |
| void call_impl(MetaServiceMethod<Request, Response> method, |
| ::google::protobuf::RpcController* ctrl, const Request* req, Response* resp, |
| ::google::protobuf::Closure* done) { |
| static_assert(std::is_base_of_v<::google::protobuf::Message, Request>); |
| static_assert(std::is_base_of_v<::google::protobuf::Message, Response>); |
| |
| using namespace std::chrono; |
| brpc::ClosureGuard done_guard(done); |
| |
| // life span of this defer MUST be longer than `done` |
| std::unique_ptr<int, std::function<void(int*)>> defer_injection( |
| (int*)(0x01), [&, this](int*) { idempotent_injection(method, req, resp); }); |
| |
| if (!config::enable_txn_store_retry) { |
| (impl_.get()->*method)(ctrl, req, resp, brpc::DoNothing()); |
| if (DCHECK_IS_ON()) { |
| MetaServiceCode code = resp->status().code(); |
| DCHECK_NE(code, MetaServiceCode::KV_TXN_STORE_GET_RETRYABLE) |
| << "KV_TXN_STORE_GET_RETRYABLE should not be sent back to client"; |
| DCHECK_NE(code, MetaServiceCode::KV_TXN_STORE_COMMIT_RETRYABLE) |
| << "KV_TXN_STORE_COMMIT_RETRYABLE should not be sent back to client"; |
| DCHECK_NE(code, MetaServiceCode::KV_TXN_STORE_CREATE_RETRYABLE) |
| << "KV_TXN_STORE_CREATE_RETRYABLE should not be sent back to client"; |
| } |
| return; |
| } |
| |
| TEST_SYNC_POINT("MetaServiceProxy::call_impl:1"); |
| |
| int32_t retry_times = 0; |
| uint64_t duration_ms = 0, retry_drift_ms = 0; |
| while (true) { |
| resp->Clear(); // reset the response message in case it is reused for retry |
| (impl_.get()->*method)(ctrl, req, resp, brpc::DoNothing()); |
| MetaServiceCode code = resp->status().code(); |
| if (code != MetaServiceCode::KV_TXN_STORE_GET_RETRYABLE && |
| code != MetaServiceCode::KV_TXN_STORE_COMMIT_RETRYABLE && |
| code != MetaServiceCode::KV_TXN_STORE_CREATE_RETRYABLE && |
| code != MetaServiceCode::KV_TXN_TOO_OLD && |
| (!config::enable_retry_txn_conflict || code != MetaServiceCode::KV_TXN_CONFLICT)) { |
| return; |
| } |
| |
| TEST_SYNC_POINT("MetaServiceProxy::call_impl:2"); |
| if (retry_times == 0) { |
| // the first retry, add random drift. |
| duration seed = duration_cast<nanoseconds>(steady_clock::now().time_since_epoch()); |
| std::default_random_engine rng(static_cast<uint64_t>(seed.count())); |
| retry_drift_ms = std::uniform_int_distribution<uint64_t>( |
| 0, config::txn_store_retry_base_intervals_ms)(rng); |
| } |
| |
| if (retry_times >= config::txn_store_retry_times || |
| // Retrying KV_TXN_TOO_OLD is very expensive, so we only retry once. |
| (retry_times > 1 && code == MetaServiceCode::KV_TXN_TOO_OLD)) { |
| // For KV_TXN_CONFLICT, we should return KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES, |
| // because BE will retries the KV_TXN_CONFLICT error. |
| resp->mutable_status()->set_code( |
| code == MetaServiceCode::KV_TXN_STORE_COMMIT_RETRYABLE ? KV_TXN_COMMIT_ERR |
| : code == MetaServiceCode::KV_TXN_STORE_GET_RETRYABLE ? KV_TXN_GET_ERR |
| : code == MetaServiceCode::KV_TXN_STORE_CREATE_RETRYABLE ? KV_TXN_CREATE_ERR |
| : code == MetaServiceCode::KV_TXN_CONFLICT |
| ? KV_TXN_CONFLICT_RETRY_EXCEEDED_MAX_TIMES |
| : MetaServiceCode::KV_TXN_TOO_OLD); |
| return; |
| } |
| |
| // 1 2 4 8 ... |
| duration_ms = |
| (1 << retry_times) * config::txn_store_retry_base_intervals_ms + retry_drift_ms; |
| TEST_SYNC_POINT_CALLBACK("MetaServiceProxy::call_impl_duration_ms", &duration_ms); |
| |
| retry_times += 1; |
| LOG(WARNING) << __PRETTY_FUNCTION__ << " sleep " << duration_ms |
| << " ms before next round, retry times left: " |
| << (config::txn_store_retry_times - retry_times) |
| << ", code: " << MetaServiceCode_Name(code) |
| << ", msg: " << resp->status().msg(); |
| bthread_usleep(duration_ms * 1000); |
| } |
| } |
| |
| template <typename Request, typename Response> |
| void idempotent_injection(MetaServiceMethod<Request, Response> method, const Request* requ, |
| Response* resp) { |
| if (!config::enable_idempotent_request_injection) return; |
| |
| using namespace std::chrono; |
| auto s = system_clock::now(); |
| static std::mt19937_64 rng(duration_cast<milliseconds>(s.time_since_epoch()).count()); |
| // clang-format off |
| // FIXME(gavin): make idempotent_request_replay_exclusion configurable via HTTP |
| static auto exclusion = []{ std::istringstream iss(config::idempotent_request_replay_exclusion); std::string e; std::unordered_set<std::string> r; |
| while (std::getline(iss, e, ',')) { r.insert(e); } return r; |
| }(); |
| auto f = new std::function<void()>([s, req = *requ, res = *resp, method, this]() mutable { // copy and capture |
| auto dist = std::uniform_int_distribution(-config::idempotent_request_replay_delay_range_ms, |
| config::idempotent_request_replay_delay_range_ms); |
| int64_t sleep_ms = config::idempotent_request_replay_delay_base_ms + dist(rng); |
| std::string debug_string = req.ShortDebugString(); |
| if constexpr (std::is_same_v<Request, GetTabletStatsRequest>) { |
| auto short_req = req; |
| if (short_req.tablet_idx_size() > 10) { |
| short_req.mutable_tablet_idx()->DeleteSubrange(10, req.tablet_idx_size() - 10); |
| } |
| debug_string = short_req.ShortDebugString(); |
| TEST_SYNC_POINT_CALLBACK("idempotent_injection_short_debug_string_for_get_stats", &short_req); |
| } |
| LOG(INFO) << " request_name=" << req.GetDescriptor()->name() |
| << " response_name=" << res.GetDescriptor()->name() |
| << " queue_ts=" << duration_cast<milliseconds>(s.time_since_epoch()).count() |
| << " now_ts=" << duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() |
| << " idempotent_request_replay_delay_base_ms=" << config::idempotent_request_replay_delay_base_ms |
| << " idempotent_request_replay_delay_range_ms=" << config::idempotent_request_replay_delay_range_ms |
| << " idempotent_request_replay_delay_ms=" << sleep_ms |
| << " request=" << debug_string; |
| if (sleep_ms < 0 || exclusion.count(req.GetDescriptor()->name())) return; |
| brpc::Controller ctrl; |
| bthread_usleep(sleep_ms * 1000); |
| (impl_.get()->*method)(&ctrl, &req, &res, brpc::DoNothing()); |
| }); |
| // clang-format on |
| bthread_t bid; |
| if (bthread_start_background(&bid, nullptr, run_bthread_work, f) != 0) { |
| LOG(WARNING) << "failed to bthread_start_background, run in current thread"; |
| run_bthread_work(f); |
| } |
| } |
| |
| std::unique_ptr<MetaServiceImpl> impl_; |
| }; |
| |
| } // namespace doris::cloud |