| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "txn_manager.h" |
| |
| #include <bvar/bvar.h> |
| #include <fmt/format.h> |
| #include <fmt/ranges.h> |
| #include <thrift/protocol/TDebugProtocol.h> |
| #include <time.h> |
| |
| #include <filesystem> |
| #include <iterator> |
| #include <list> |
| #include <new> |
| #include <ostream> |
| #include <queue> |
| #include <set> |
| #include <string> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "olap/data_dir.h" |
| #include "olap/delta_writer.h" |
| #include "olap/olap_common.h" |
| #include "olap/partial_update_info.h" |
| #include "olap/rowset/beta_rowset.h" |
| #include "olap/rowset/pending_rowset_helper.h" |
| #include "olap/rowset/rowset_meta.h" |
| #include "olap/rowset/rowset_meta_manager.h" |
| #include "olap/schema_change.h" |
| #include "olap/segment_loader.h" |
| #include "olap/storage_engine.h" |
| #include "olap/tablet_manager.h" |
| #include "olap/tablet_meta.h" |
| #include "olap/tablet_meta_manager.h" |
| #include "olap/task/engine_publish_version_task.h" |
| #include "util/debug_points.h" |
| #include "util/time.h" |
| |
| namespace doris { |
| class OlapMeta; |
| } // namespace doris |
| |
| using std::map; |
| using std::pair; |
| using std::set; |
| using std::string; |
| using std::stringstream; |
| using std::vector; |
| |
| namespace doris { |
| using namespace ErrorCode; |
| |
| bvar::Adder<int64_t> g_tablet_txn_info_txn_partitions_count("tablet_txn_info_txn_partitions_count"); |
| |
| TxnManager::TxnManager(StorageEngine& engine, int32_t txn_map_shard_size, int32_t txn_shard_size) |
| : _engine(engine), |
| _txn_map_shard_size(txn_map_shard_size), |
| _txn_shard_size(txn_shard_size) { |
| DCHECK_GT(_txn_map_shard_size, 0); |
| DCHECK_GT(_txn_shard_size, 0); |
| DCHECK_EQ(_txn_map_shard_size & (_txn_map_shard_size - 1), 0); |
| DCHECK_EQ(_txn_shard_size & (_txn_shard_size - 1), 0); |
| _txn_map_locks = new std::shared_mutex[_txn_map_shard_size]; |
| _txn_tablet_maps = new txn_tablet_map_t[_txn_map_shard_size]; |
| _txn_partition_maps = new txn_partition_map_t[_txn_map_shard_size]; |
| _txn_mutex = new std::shared_mutex[_txn_shard_size]; |
| _txn_tablet_delta_writer_map = new txn_tablet_delta_writer_map_t[_txn_map_shard_size]; |
| _txn_tablet_delta_writer_map_locks = new std::shared_mutex[_txn_map_shard_size]; |
| // For debugging |
| _tablet_version_cache = std::make_unique<TabletVersionCache>(100000); |
| } |
| |
| // prepare txn should always be allowed because ingest task will be retried |
| // could not distinguish rollup, schema change or base table, prepare txn successfully will allow |
| // ingest retried |
| Status TxnManager::prepare_txn(TPartitionId partition_id, const Tablet& tablet, |
| TTransactionId transaction_id, const PUniqueId& load_id, |
| bool ingest) { |
| // check if the tablet has already been shutdown. If it has, it indicates that |
| // it is an old tablet, and data should not be imported into the old tablet. |
| // Otherwise, it may lead to data loss during migration. |
| if (tablet.tablet_state() == TABLET_SHUTDOWN) { |
| return Status::InternalError<false>( |
| "The tablet's state is shutdown, tablet_id: {}. The tablet may have been dropped " |
| "or migrationed. Please check if the table has been dropped or try again.", |
| tablet.tablet_id()); |
| } |
| return prepare_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid(), |
| load_id, ingest); |
| } |
| |
| // most used for ut |
| Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transaction_id, |
| TTabletId tablet_id, TabletUid tablet_uid, const PUniqueId& load_id, |
| bool ingest) { |
| TxnKey key(partition_id, transaction_id); |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id)); |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| |
| DBUG_EXECUTE_IF("TxnManager.prepare_txn.random_failed", { |
| if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
| LOG_WARNING("TxnManager.prepare_txn.random_failed random failed") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id); |
| return Status::InternalError("debug prepare txn random failed"); |
| } |
| }); |
| DBUG_EXECUTE_IF("TxnManager.prepare_txn.wait", { |
| if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
| LOG_WARNING("TxnManager.prepare_txn.wait") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id) |
| .tag("wait ms", wait); |
| std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
| } |
| }); |
| |
| /// Step 1: check if the transaction is already exist |
| do { |
| auto iter = txn_tablet_map.find(key); |
| if (iter == txn_tablet_map.end()) { |
| break; |
| } |
| |
| // exist TxnKey |
| auto& txn_tablet_info_map = iter->second; |
| auto load_itr = txn_tablet_info_map.find(tablet_info); |
| if (load_itr == txn_tablet_info_map.end()) { |
| break; |
| } |
| |
| // found load for txn,tablet |
| auto& load_info = load_itr->second; |
| // case 1: user commit rowset, then the load id must be equal |
| // check if load id is equal |
| auto& load_id = load_info->load_id; |
| if (load_info->load_id.hi() == load_id.hi() && load_info->load_id.lo() == load_id.lo() && |
| load_info->rowset != nullptr) { |
| LOG(WARNING) << "find transaction exists when add to engine." |
| << "partition_id: " << key.first << ", transaction_id: " << key.second |
| << ", tablet: " << tablet_info.to_string(); |
| return Status::OK(); |
| } |
| } while (false); |
| |
| /// Step 2: check if there are too many transactions on running. |
| // check if there are too many transactions on running. |
| // if yes, reject the request. |
| txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
| if (txn_partition_map.size() > config::max_runnings_transactions_per_txn_map) { |
| return Status::Error<TOO_MANY_TRANSACTIONS>("too many transactions: {}, limit: {}", |
| txn_tablet_map.size(), |
| config::max_runnings_transactions_per_txn_map); |
| } |
| |
| /// Step 3: Add transaction to engine |
| // not found load id |
| // case 1: user start a new txn, rowset = null |
| // case 2: loading txn from meta env |
| auto load_info = std::make_shared<TabletTxnInfo>(load_id, nullptr, ingest); |
| load_info->prepare(); |
| if (!txn_tablet_map.contains(key)) { |
| g_tablet_txn_info_txn_partitions_count << 1; |
| } |
| txn_tablet_map[key][tablet_info] = std::move(load_info); |
| _insert_txn_partition_map_unlocked(transaction_id, partition_id); |
| VLOG_NOTICE << "add transaction to engine successfully." |
| << "partition_id: " << key.first << ", transaction_id: " << key.second |
| << ", tablet: " << tablet_info.to_string(); |
| return Status::OK(); |
| } |
| |
| Status TxnManager::commit_txn(TPartitionId partition_id, const Tablet& tablet, |
| TTransactionId transaction_id, const PUniqueId& load_id, |
| const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, |
| bool is_recovery, |
| std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
| return commit_txn(tablet.data_dir()->get_meta(), partition_id, transaction_id, |
| tablet.tablet_id(), tablet.tablet_uid(), load_id, rowset_ptr, |
| std::move(guard), is_recovery, partial_update_info); |
| } |
| |
| Status TxnManager::publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
| TTransactionId transaction_id, const Version& version, |
| TabletPublishStatistics* stats, |
| std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info) { |
| return publish_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, |
| tablet->tablet_id(), tablet->tablet_uid(), version, stats, |
| extend_tablet_txn_info); |
| } |
| |
| void TxnManager::abort_txn(TPartitionId partition_id, TTransactionId transaction_id, |
| TTabletId tablet_id, TabletUid tablet_uid) { |
| pair<int64_t, int64_t> key(partition_id, transaction_id); |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| |
| std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
| |
| auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| auto it = txn_tablet_map.find(key); |
| if (it == txn_tablet_map.end()) { |
| return; |
| } |
| |
| auto& tablet_txn_info_map = it->second; |
| auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); |
| if (tablet_txn_info_iter == tablet_txn_info_map.end()) { |
| return; |
| } |
| |
| auto& txn_info = tablet_txn_info_iter->second; |
| txn_info->abort(); |
| } |
| |
| // delete the txn from manager if it is not committed(not have a valid rowset) |
| Status TxnManager::rollback_txn(TPartitionId partition_id, const Tablet& tablet, |
| TTransactionId transaction_id) { |
| return rollback_txn(partition_id, transaction_id, tablet.tablet_id(), tablet.tablet_uid()); |
| } |
| |
| Status TxnManager::delete_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, |
| TTransactionId transaction_id) { |
| return delete_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, |
| tablet->tablet_id(), tablet->tablet_uid()); |
| } |
| |
| void TxnManager::set_txn_related_delete_bitmap( |
| TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, |
| TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, |
| const RowsetIdUnorderedSet& rowset_ids, |
| std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
| pair<int64_t, int64_t> key(partition_id, transaction_id); |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| |
| std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
| { |
| // get tx |
| std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| auto it = txn_tablet_map.find(key); |
| if (it == txn_tablet_map.end()) { |
| LOG(WARNING) << "transaction_id: " << transaction_id |
| << " partition_id: " << partition_id << " may be cleared"; |
| return; |
| } |
| auto load_itr = it->second.find(tablet_info); |
| if (load_itr == it->second.end()) { |
| LOG(WARNING) << "transaction_id: " << transaction_id |
| << " partition_id: " << partition_id << " tablet_id: " << tablet_id |
| << " may be cleared"; |
| return; |
| } |
| auto& load_info = load_itr->second; |
| load_info->unique_key_merge_on_write = unique_key_merge_on_write; |
| load_info->delete_bitmap = delete_bitmap; |
| load_info->rowset_ids = rowset_ids; |
| load_info->partial_update_info = partial_update_info; |
| } |
| } |
| |
| Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId partition_id, |
| TTransactionId transaction_id, TTabletId tablet_id, |
| TabletUid tablet_uid, const PUniqueId& load_id, |
| const RowsetSharedPtr& rowset_ptr, PendingRowsetGuard guard, |
| bool is_recovery, |
| std::shared_ptr<PartialUpdateInfo> partial_update_info) { |
| if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) { |
| LOG(WARNING) << "invalid commit req " |
| << " partition_id=" << partition_id << " transaction_id=" << transaction_id |
| << " tablet_id=" << tablet_id; |
| return Status::InternalError("invalid partition id"); |
| } |
| |
| pair<int64_t, int64_t> key(partition_id, transaction_id); |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| if (rowset_ptr == nullptr) { |
| return Status::Error<ROWSET_INVALID>( |
| "could not commit txn because rowset ptr is null. partition_id: {}, " |
| "transaction_id: {}, tablet: {}", |
| key.first, key.second, tablet_info.to_string()); |
| } |
| |
| DBUG_EXECUTE_IF("TxnManager.commit_txn.random_failed", { |
| if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
| LOG_WARNING("TxnManager.commit_txn.random_failed") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id); |
| return Status::InternalError("debug commit txn random failed"); |
| } |
| }); |
| DBUG_EXECUTE_IF("TxnManager.commit_txn.wait", { |
| if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
| LOG_WARNING("TxnManager.commit_txn.wait") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id) |
| .tag("wait ms", wait); |
| std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
| } |
| }); |
| |
| std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
| // this while loop just run only once, just for if break |
| do { |
| // get tx |
| std::shared_lock rdlock(_get_txn_map_lock(transaction_id)); |
| auto rs_pb = rowset_ptr->rowset_meta()->get_rowset_pb(); |
| // TODO(dx): remove log after fix partition id eq 0 bug |
| if (!rs_pb.has_partition_id() || rs_pb.partition_id() == 0) { |
| rowset_ptr->rowset_meta()->set_partition_id(partition_id); |
| LOG(WARNING) << "cant get partition id from rs pb, get from func arg partition_id=" |
| << partition_id; |
| } |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| auto it = txn_tablet_map.find(key); |
| if (it == txn_tablet_map.end()) { |
| break; |
| } |
| |
| auto load_itr = it->second.find(tablet_info); |
| if (load_itr == it->second.end()) { |
| break; |
| } |
| |
| // found load for txn,tablet |
| // case 1: user commit rowset, then the load id must be equal |
| auto& load_info = load_itr->second; |
| // check if load id is equal |
| if (load_info->rowset == nullptr) { |
| break; |
| } |
| |
| if (load_info->load_id.hi() != load_id.hi() || load_info->load_id.lo() != load_id.lo()) { |
| break; |
| } |
| |
| // find a rowset with same rowset id, then it means a duplicate call |
| if (load_info->rowset->rowset_id() == rowset_ptr->rowset_id()) { |
| LOG(INFO) << "find rowset exists when commit transaction to engine." |
| << "partition_id: " << key.first << ", transaction_id: " << key.second |
| << ", tablet: " << tablet_info.to_string() |
| << ", rowset_id: " << load_info->rowset->rowset_id(); |
| // Should not remove this rowset from pending rowsets |
| load_info->pending_rs_guard = std::move(guard); |
| return Status::OK(); |
| } |
| |
| // find a rowset with different rowset id, then it should not happen, just return errors |
| return Status::Error<PUSH_TRANSACTION_ALREADY_EXIST>( |
| "find rowset exists when commit transaction to engine. but rowset ids are not " |
| "same. partition_id: {}, transaction_id: {}, tablet: {}, exist rowset_id: {}, new " |
| "rowset_id: {}", |
| key.first, key.second, tablet_info.to_string(), |
| load_info->rowset->rowset_id().to_string(), rowset_ptr->rowset_id().to_string()); |
| } while (false); |
| |
| // if not in recovery mode, then should persist the meta to meta env |
| // save meta need access disk, it maybe very slow, so that it is not in global txn lock |
| // it is under a single txn lock |
| if (!is_recovery) { |
| Status save_status = |
| RowsetMetaManager::save(meta, tablet_uid, rowset_ptr->rowset_id(), |
| rowset_ptr->rowset_meta()->get_rowset_pb(), false); |
| DBUG_EXECUTE_IF("TxnManager.RowsetMetaManager.save_wait", { |
| if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
| LOG_WARNING("TxnManager.RowsetMetaManager.save_wait") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id) |
| .tag("wait ms", wait); |
| std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
| } |
| }); |
| if (!save_status.ok()) { |
| save_status.append(fmt::format(", txn id: {}", transaction_id)); |
| return save_status; |
| } |
| |
| if (partial_update_info && partial_update_info->is_partial_update()) { |
| PartialUpdateInfoPB partial_update_info_pb; |
| partial_update_info->to_pb(&partial_update_info_pb); |
| save_status = RowsetMetaManager::save_partial_update_info( |
| meta, tablet_id, partition_id, transaction_id, partial_update_info_pb); |
| if (!save_status.ok()) { |
| save_status.append(fmt::format(", txn_id: {}", transaction_id)); |
| return save_status; |
| } |
| } |
| } |
| |
| TabletSharedPtr tablet; |
| std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr}; |
| if (is_recovery) { |
| tablet = _engine.tablet_manager()->get_tablet(tablet_id, tablet_uid); |
| if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { |
| PartialUpdateInfoPB partial_update_info_pb; |
| auto st = RowsetMetaManager::try_get_partial_update_info( |
| meta, tablet_id, partition_id, transaction_id, &partial_update_info_pb); |
| if (st.ok()) { |
| decoded_partial_update_info = std::make_shared<PartialUpdateInfo>(); |
| decoded_partial_update_info->from_pb(&partial_update_info_pb); |
| DCHECK(decoded_partial_update_info->is_partial_update()); |
| } else if (!st.is<META_KEY_NOT_FOUND>()) { |
| // the load is not a partial update |
| return st; |
| } |
| } |
| } |
| |
| { |
| std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
| auto load_info = std::make_shared<TabletTxnInfo>(load_id, rowset_ptr); |
| load_info->pending_rs_guard = std::move(guard); |
| if (is_recovery) { |
| if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) { |
| load_info->unique_key_merge_on_write = true; |
| load_info->delete_bitmap.reset(new DeleteBitmap(tablet->tablet_id())); |
| if (decoded_partial_update_info) { |
| LOG_INFO( |
| "get partial update info from RocksDB during recovery. txn_id={}, " |
| "partition_id={}, tablet_id={}, partial_update_info=[{}]", |
| transaction_id, partition_id, tablet_id, |
| decoded_partial_update_info->summary()); |
| load_info->partial_update_info = decoded_partial_update_info; |
| } |
| } |
| } |
| load_info->commit(); |
| |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| txn_tablet_map[key][tablet_info] = std::move(load_info); |
| _insert_txn_partition_map_unlocked(transaction_id, partition_id); |
| VLOG_NOTICE << "commit transaction to engine successfully." |
| << " partition_id: " << key.first << ", transaction_id: " << key.second |
| << ", tablet: " << tablet_info.to_string() |
| << ", rowsetid: " << rowset_ptr->rowset_id() |
| << ", version: " << rowset_ptr->version().first; |
| } |
| return Status::OK(); |
| } |
| |
| // remove a txn from txn manager |
| Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, |
| TTransactionId transaction_id, TTabletId tablet_id, |
| TabletUid tablet_uid, const Version& version, |
| TabletPublishStatistics* stats, |
| std::shared_ptr<TabletTxnInfo>& extend_tablet_txn_info) { |
| auto tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| return Status::OK(); |
| } |
| DCHECK(stats != nullptr); |
| |
| pair<int64_t, int64_t> key(partition_id, transaction_id); |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| RowsetSharedPtr rowset; |
| std::shared_ptr<TabletTxnInfo> tablet_txn_info; |
| int64_t t1 = MonotonicMicros(); |
| /// Step 1: get rowset, tablet_txn_info by key |
| { |
| std::shared_lock txn_rlock(_get_txn_lock(transaction_id)); |
| std::shared_lock txn_map_rlock(_get_txn_map_lock(transaction_id)); |
| stats->lock_wait_time_us += MonotonicMicros() - t1; |
| |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { |
| auto& tablet_map = it->second; |
| if (auto txn_info_iter = tablet_map.find(tablet_info); |
| txn_info_iter != tablet_map.end()) { |
| // found load for txn,tablet |
| // case 1: user commit rowset, then the load id must be equal |
| tablet_txn_info = txn_info_iter->second; |
| extend_tablet_txn_info = tablet_txn_info; |
| rowset = tablet_txn_info->rowset; |
| } |
| } |
| } |
| if (rowset == nullptr) { |
| return Status::Error<TRANSACTION_NOT_EXIST>( |
| "publish txn failed, rowset not found. partition_id={}, transaction_id={}, " |
| "tablet={}", |
| partition_id, transaction_id, tablet_info.to_string()); |
| } |
| DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_before_save_rs_meta", { |
| if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
| LOG_WARNING("TxnManager.publish_txn.random_failed_before_save_rs_meta") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id); |
| return Status::InternalError("debug publish txn before save rs meta random failed"); |
| } |
| }); |
| DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_before_save_rs_meta", { |
| if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
| LOG_WARNING("TxnManager.publish_txn.wait_before_save_rs_meta") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id) |
| .tag("wait ms", wait); |
| std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
| } |
| }); |
| |
| /// Step 2: make rowset visible |
| // save meta need access disk, it maybe very slow, so that it is not in global txn lock |
| // it is under a single txn lock |
| // TODO(ygl): rowset is already set version here, memory is changed, if save failed |
| // it maybe a fatal error |
| rowset->make_visible(version); |
| |
| DBUG_EXECUTE_IF("TxnManager.publish_txn.random_failed_after_save_rs_meta", { |
| if (rand() % 100 < (100 * dp->param("percent", 0.5))) { |
| LOG_WARNING("TxnManager.publish_txn.random_failed_after_save_rs_meta") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id); |
| return Status::InternalError("debug publish txn after save rs meta random failed"); |
| } |
| }); |
| DBUG_EXECUTE_IF("TxnManager.publish_txn.wait_after_save_rs_meta", { |
| if (auto wait = dp->param<int>("duration", 0); wait > 0) { |
| LOG_WARNING("TxnManager.publish_txn.wait_after_save_rs_meta") |
| .tag("txn_id", transaction_id) |
| .tag("tablet_id", tablet_id) |
| .tag("wait ms", wait); |
| std::this_thread::sleep_for(std::chrono::milliseconds(wait)); |
| } |
| }); |
| // update delete_bitmap |
| if (tablet_txn_info->unique_key_merge_on_write) { |
| int64_t t2 = MonotonicMicros(); |
| if (rowset->num_segments() > 1 && |
| !tablet_txn_info->delete_bitmap->has_calculated_for_multi_segments( |
| rowset->rowset_id())) { |
| // delete bitmap is empty, should re-calculate delete bitmaps between segments |
| std::vector<segment_v2::SegmentSharedPtr> segments; |
| RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments)); |
| RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments( |
| rowset->tablet_schema(), rowset->rowset_id(), segments, |
| tablet_txn_info->delete_bitmap)); |
| } |
| |
| RETURN_IF_ERROR( |
| Tablet::update_delete_bitmap(tablet, tablet_txn_info.get(), transaction_id)); |
| int64_t t3 = MonotonicMicros(); |
| stats->calc_delete_bitmap_time_us = t3 - t2; |
| RETURN_IF_ERROR(TabletMetaManager::save_delete_bitmap( |
| tablet->data_dir(), tablet->tablet_id(), tablet_txn_info->delete_bitmap, |
| version.second)); |
| stats->save_meta_time_us = MonotonicMicros() - t3; |
| } |
| |
| /// Step 3: add to binlog |
| auto enable_binlog = tablet->is_enable_binlog(); |
| if (enable_binlog) { |
| auto status = rowset->add_to_binlog(); |
| if (!status.ok()) { |
| return Status::Error<ROWSET_ADD_TO_BINLOG_FAILED>( |
| "add rowset to binlog failed. when publish txn rowset_id: {}, tablet id: {}, " |
| "txn id: {}, status: {}", |
| rowset->rowset_id().to_string(), tablet_id, transaction_id, |
| status.to_string_no_stack()); |
| } |
| } |
| |
| /// Step 4: save meta |
| int64_t t5 = MonotonicMicros(); |
| auto status = RowsetMetaManager::save(meta, tablet_uid, rowset->rowset_id(), |
| rowset->rowset_meta()->get_rowset_pb(), enable_binlog); |
| stats->save_meta_time_us += MonotonicMicros() - t5; |
| if (!status.ok()) { |
| status.append(fmt::format(", txn id: {}", transaction_id)); |
| return status; |
| } |
| |
| if (tablet_txn_info->unique_key_merge_on_write && tablet_txn_info->partial_update_info && |
| tablet_txn_info->partial_update_info->is_partial_update()) { |
| status = RowsetMetaManager::remove_partial_update_info(meta, tablet_id, partition_id, |
| transaction_id); |
| if (!status) { |
| // discard the error status and print the warning log |
| LOG_WARNING( |
| "fail to remove partial update info from RocksDB. txn_id={}, rowset_id={}, " |
| "tablet_id={}, tablet_uid={}", |
| transaction_id, rowset->rowset_id().to_string(), tablet_id, |
| tablet_uid.to_string()); |
| } |
| } |
| |
| // TODO(Drogon): remove these test codes |
| if (enable_binlog) { |
| auto version_str = fmt::format("{}", version.first); |
| VLOG_DEBUG << fmt::format("tabletid: {}, version: {}, binlog filepath: {}", tablet_id, |
| version_str, tablet->get_binlog_filepath(version_str)); |
| } |
| |
| /// Step 5: remove tablet_info from tnx_tablet_map |
| // txn_tablet_map[key] empty, remove key from txn_tablet_map |
| int64_t t6 = MonotonicMicros(); |
| std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
| std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
| stats->lock_wait_time_us += MonotonicMicros() - t6; |
| _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock, |
| wrlock); |
| VLOG_NOTICE << "publish txn successfully." |
| << " partition_id: " << key.first << ", txn_id: " << key.second |
| << ", tablet_id: " << tablet_info.tablet_id << ", rowsetid: " << rowset->rowset_id() |
| << ", version: " << version.first << "," << version.second; |
| return status; |
| } |
| |
| void TxnManager::_remove_txn_tablet_info_unlocked(TPartitionId partition_id, |
| TTransactionId transaction_id, |
| TTabletId tablet_id, TabletUid tablet_uid, |
| std::lock_guard<std::shared_mutex>& txn_lock, |
| std::lock_guard<std::shared_mutex>& wrlock) { |
| std::pair<int64_t, int64_t> key {partition_id, transaction_id}; |
| TabletInfo tablet_info {tablet_id, tablet_uid}; |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| if (auto it = txn_tablet_map.find(key); it != txn_tablet_map.end()) { |
| it->second.erase(tablet_info); |
| if (it->second.empty()) { |
| txn_tablet_map.erase(it); |
| g_tablet_txn_info_txn_partitions_count << -1; |
| _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
| } |
| } |
| } |
| |
| void TxnManager::remove_txn_tablet_info(TPartitionId partition_id, TTransactionId transaction_id, |
| TTabletId tablet_id, TabletUid tablet_uid) { |
| std::lock_guard<std::shared_mutex> txn_lock(_get_txn_lock(transaction_id)); |
| std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
| _remove_txn_tablet_info_unlocked(partition_id, transaction_id, tablet_id, tablet_uid, txn_lock, |
| wrlock); |
| } |
| |
| // txn could be rollbacked if it does not have related rowset |
| // if the txn has related rowset then could not rollback it, because it |
| // may be committed in another thread and our current thread meets errors when writing to data file |
| // BE has to wait for fe call clear txn api |
| Status TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, |
| TTabletId tablet_id, TabletUid tablet_uid) { |
| pair<int64_t, int64_t> key(partition_id, transaction_id); |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| |
| std::lock_guard<std::shared_mutex> wrlock(_get_txn_map_lock(transaction_id)); |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| |
| auto it = txn_tablet_map.find(key); |
| if (it == txn_tablet_map.end()) { |
| return Status::OK(); |
| } |
| |
| auto& tablet_txn_info_map = it->second; |
| if (auto load_itr = tablet_txn_info_map.find(tablet_info); |
| load_itr != tablet_txn_info_map.end()) { |
| // found load for txn,tablet |
| // case 1: user commit rowset, then the load id must be equal |
| const auto& load_info = load_itr->second; |
| if (load_info->rowset != nullptr) { |
| return Status::Error<TRANSACTION_ALREADY_COMMITTED>( |
| "if rowset is not null, it means other thread may commit the rowset should " |
| "not delete txn any more"); |
| } |
| } |
| |
| tablet_txn_info_map.erase(tablet_info); |
| LOG(INFO) << "rollback transaction from engine successfully." |
| << " partition_id: " << key.first << ", transaction_id: " << key.second |
| << ", tablet: " << tablet_info.to_string(); |
| if (tablet_txn_info_map.empty()) { |
| txn_tablet_map.erase(it); |
| g_tablet_txn_info_txn_partitions_count << -1; |
| _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
| } |
| return Status::OK(); |
| } |
| |
| // fe call this api to clear unused rowsets in be |
| // could not delete the rowset if it already has a valid version |
| Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, |
| TTransactionId transaction_id, TTabletId tablet_id, |
| TabletUid tablet_uid) { |
| pair<int64_t, int64_t> key(partition_id, transaction_id); |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| std::lock_guard<std::shared_mutex> txn_wrlock(_get_txn_map_lock(transaction_id)); |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| auto it = txn_tablet_map.find(key); |
| if (it == txn_tablet_map.end()) { |
| return Status::Error<TRANSACTION_NOT_EXIST>("key not founded from txn_tablet_map"); |
| } |
| Status st = Status::OK(); |
| auto load_itr = it->second.find(tablet_info); |
| if (load_itr != it->second.end()) { |
| // found load for txn,tablet |
| // case 1: user commit rowset, then the load id must be equal |
| auto& load_info = load_itr->second; |
| auto& rowset = load_info->rowset; |
| if (rowset != nullptr && meta != nullptr) { |
| if (!rowset->is_pending()) { |
| st = Status::Error<TRANSACTION_ALREADY_COMMITTED>( |
| "could not delete transaction from engine, just remove it from memory not " |
| "delete from disk, because related rowset already published. partition_id: " |
| "{}, transaction_id: {}, tablet: {}, rowset id: {}, version: {}, state: {}", |
| key.first, key.second, tablet_info.to_string(), |
| rowset->rowset_id().to_string(), rowset->version().to_string(), |
| RowsetStatePB_Name(rowset->rowset_meta_state())); |
| } else { |
| static_cast<void>(RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); |
| #ifndef BE_TEST |
| _engine.add_unused_rowset(rowset); |
| #endif |
| VLOG_NOTICE << "delete transaction from engine successfully." |
| << " partition_id: " << key.first << ", transaction_id: " << key.second |
| << ", tablet: " << tablet_info.to_string() << ", rowset: " |
| << (rowset != nullptr ? rowset->rowset_id().to_string() : "0"); |
| } |
| } |
| it->second.erase(load_itr); |
| } |
| if (it->second.empty()) { |
| txn_tablet_map.erase(it); |
| g_tablet_txn_info_txn_partitions_count << -1; |
| _clear_txn_partition_map_unlocked(transaction_id, partition_id); |
| } |
| return st; |
| } |
| |
| void TxnManager::get_tablet_related_txns(TTabletId tablet_id, TabletUid tablet_uid, |
| int64_t* partition_id, |
| std::set<int64_t>* transaction_ids) { |
| if (partition_id == nullptr || transaction_ids == nullptr) { |
| LOG(WARNING) << "parameter is null when get transactions by tablet"; |
| return; |
| } |
| |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
| std::shared_lock txn_rdlock(_txn_map_locks[i]); |
| txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; |
| for (auto& it : txn_tablet_map) { |
| if (it.second.find(tablet_info) != it.second.end()) { |
| *partition_id = it.first.first; |
| transaction_ids->insert(it.first.second); |
| VLOG_NOTICE << "find transaction on tablet." |
| << "partition_id: " << it.first.first |
| << ", transaction_id: " << it.first.second |
| << ", tablet: " << tablet_info.to_string(); |
| } |
| } |
| } |
| } |
| |
| // force drop all txns related with the tablet |
| // maybe lock error, because not get txn lock before remove from meta |
| void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, |
| TabletUid tablet_uid) { |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
| std::lock_guard<std::shared_mutex> txn_wrlock(_txn_map_locks[i]); |
| txn_tablet_map_t& txn_tablet_map = _txn_tablet_maps[i]; |
| for (auto it = txn_tablet_map.begin(); it != txn_tablet_map.end();) { |
| auto load_itr = it->second.find(tablet_info); |
| if (load_itr != it->second.end()) { |
| auto& load_info = load_itr->second; |
| auto& rowset = load_info->rowset; |
| if (rowset != nullptr && meta != nullptr) { |
| LOG(INFO) << " delete transaction from engine " |
| << ", tablet: " << tablet_info.to_string() |
| << ", rowset id: " << rowset->rowset_id(); |
| static_cast<void>( |
| RowsetMetaManager::remove(meta, tablet_uid, rowset->rowset_id())); |
| } |
| LOG(INFO) << "remove tablet related txn." |
| << " partition_id: " << it->first.first |
| << ", transaction_id: " << it->first.second |
| << ", tablet: " << tablet_info.to_string() << ", rowset: " |
| << (rowset != nullptr ? rowset->rowset_id().to_string() : "0"); |
| it->second.erase(load_itr); |
| } |
| if (it->second.empty()) { |
| _clear_txn_partition_map_unlocked(it->first.second, it->first.first); |
| it = txn_tablet_map.erase(it); |
| g_tablet_txn_info_txn_partitions_count << -1; |
| } else { |
| ++it; |
| } |
| } |
| } |
| if (meta != nullptr) { |
| Status st = RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id); |
| if (!st.ok()) { |
| LOG_WARNING("failed to partial update info, tablet_id={}, err={}", tablet_id, |
| st.to_string()); |
| } |
| } |
| } |
| |
| void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, |
| TPartitionId partition_id, |
| std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) { |
| // get tablets in this transaction |
| pair<int64_t, int64_t> key(partition_id, transaction_id); |
| std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
| txn_tablet_map_t& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| auto it = txn_tablet_map.find(key); |
| if (it == txn_tablet_map.end()) { |
| VLOG_NOTICE << "could not find tablet for" |
| << " partition_id=" << partition_id << ", transaction_id=" << transaction_id; |
| return; |
| } |
| auto& load_info_map = it->second; |
| |
| // each tablet |
| for (auto& load_info : load_info_map) { |
| const TabletInfo& tablet_info = load_info.first; |
| // must not check rowset == null here, because if rowset == null |
| // publish version should failed |
| tablet_infos->emplace(tablet_info, load_info.second->rowset); |
| } |
| } |
| |
| void TxnManager::get_all_related_tablets(std::set<TabletInfo>* tablet_infos) { |
| for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
| std::shared_lock txn_rdlock(_txn_map_locks[i]); |
| for (auto& it : _txn_tablet_maps[i]) { |
| for (auto& tablet_load_it : it.second) { |
| tablet_infos->emplace(tablet_load_it.first); |
| } |
| } |
| } |
| } |
| |
| void TxnManager::get_all_commit_tablet_txn_info_by_tablet( |
| const Tablet& tablet, CommitTabletTxnInfoVec* commit_tablet_txn_info_vec) { |
| for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
| std::shared_lock txn_rdlock(_txn_map_locks[i]); |
| for (const auto& [txn_key, load_info_map] : _txn_tablet_maps[i]) { |
| auto tablet_load_it = load_info_map.find(tablet.get_tablet_info()); |
| if (tablet_load_it != load_info_map.end()) { |
| const auto& [_, load_info] = *tablet_load_it; |
| const auto& rowset = load_info->rowset; |
| const auto& delete_bitmap = load_info->delete_bitmap; |
| if (!rowset || !delete_bitmap) { |
| continue; |
| } |
| commit_tablet_txn_info_vec->push_back({ |
| .transaction_id = txn_key.second, |
| .partition_id = txn_key.first, |
| .delete_bitmap = delete_bitmap, |
| .rowset_ids = load_info->rowset_ids, |
| .partial_update_info = load_info->partial_update_info, |
| }); |
| } |
| } |
| } |
| } |
| |
| void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::vector<int64_t>>* expire_txn_map) { |
| int64_t now = UnixSeconds(); |
| // traverse the txn map, and get all expired txns |
| for (int32_t i = 0; i < _txn_map_shard_size; i++) { |
| std::shared_lock txn_rdlock(_txn_map_locks[i]); |
| for (auto&& [txn_key, tablet_txn_infos] : _txn_tablet_maps[i]) { |
| auto txn_id = txn_key.second; |
| for (auto&& [tablet_info, txn_info] : tablet_txn_infos) { |
| double diff = difftime(now, txn_info->creation_time); |
| if (diff < config::pending_data_expire_time_sec) { |
| continue; |
| } |
| |
| (*expire_txn_map)[tablet_info].push_back(txn_id); |
| if (VLOG_IS_ON(3)) { |
| VLOG_NOTICE << "find expired txn." |
| << " tablet=" << tablet_info.to_string() |
| << " transaction_id=" << txn_id << " exist_sec=" << diff; |
| } |
| } |
| } |
| } |
| } |
| |
| void TxnManager::get_partition_ids(const TTransactionId transaction_id, |
| std::vector<TPartitionId>* partition_ids) { |
| std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
| txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
| auto it = txn_partition_map.find(transaction_id); |
| if (it != txn_partition_map.end()) { |
| for (int64_t partition_id : it->second) { |
| partition_ids->push_back(partition_id); |
| } |
| } |
| } |
| |
| void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { |
| txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
| auto find = txn_partition_map.find(transaction_id); |
| if (find == txn_partition_map.end()) { |
| txn_partition_map[transaction_id] = std::unordered_set<int64_t>(); |
| } |
| txn_partition_map[transaction_id].insert(partition_id); |
| } |
| |
| void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) { |
| txn_partition_map_t& txn_partition_map = _get_txn_partition_map(transaction_id); |
| auto it = txn_partition_map.find(transaction_id); |
| if (it != txn_partition_map.end()) { |
| it->second.erase(partition_id); |
| if (it->second.empty()) { |
| txn_partition_map.erase(it); |
| } |
| } |
| } |
| |
| void TxnManager::add_txn_tablet_delta_writer(int64_t transaction_id, int64_t tablet_id, |
| DeltaWriter* delta_writer) { |
| std::lock_guard<std::shared_mutex> txn_wrlock( |
| _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
| txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
| _get_txn_tablet_delta_writer_map(transaction_id); |
| auto find = txn_tablet_delta_writer_map.find(transaction_id); |
| if (find == txn_tablet_delta_writer_map.end()) { |
| txn_tablet_delta_writer_map[transaction_id] = std::map<int64_t, DeltaWriter*>(); |
| } |
| txn_tablet_delta_writer_map[transaction_id][tablet_id] = delta_writer; |
| } |
| |
| void TxnManager::finish_slave_tablet_pull_rowset(int64_t transaction_id, int64_t tablet_id, |
| int64_t node_id, bool is_succeed) { |
| std::lock_guard<std::shared_mutex> txn_wrlock( |
| _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
| txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
| _get_txn_tablet_delta_writer_map(transaction_id); |
| auto find_txn = txn_tablet_delta_writer_map.find(transaction_id); |
| if (find_txn == txn_tablet_delta_writer_map.end()) { |
| LOG(WARNING) << "delta writer manager is not exist, txn_id=" << transaction_id |
| << ", tablet_id=" << tablet_id; |
| return; |
| } |
| auto find_tablet = txn_tablet_delta_writer_map[transaction_id].find(tablet_id); |
| if (find_tablet == txn_tablet_delta_writer_map[transaction_id].end()) { |
| LOG(WARNING) << "delta writer is not exist, txn_id=" << transaction_id |
| << ", tablet_id=" << tablet_id; |
| return; |
| } |
| DeltaWriter* delta_writer = txn_tablet_delta_writer_map[transaction_id][tablet_id]; |
| delta_writer->finish_slave_tablet_pull_rowset(node_id, is_succeed); |
| } |
| |
| void TxnManager::clear_txn_tablet_delta_writer(int64_t transaction_id) { |
| std::lock_guard<std::shared_mutex> txn_wrlock( |
| _get_txn_tablet_delta_writer_map_lock(transaction_id)); |
| txn_tablet_delta_writer_map_t& txn_tablet_delta_writer_map = |
| _get_txn_tablet_delta_writer_map(transaction_id); |
| auto it = txn_tablet_delta_writer_map.find(transaction_id); |
| if (it != txn_tablet_delta_writer_map.end()) { |
| txn_tablet_delta_writer_map.erase(it); |
| } |
| VLOG_CRITICAL << "remove delta writer manager, txn_id=" << transaction_id; |
| } |
| |
| int64_t TxnManager::get_txn_by_tablet_version(int64_t tablet_id, int64_t version) { |
| char key[16]; |
| memcpy(key, &tablet_id, sizeof(int64_t)); |
| memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); |
| CacheKey cache_key((const char*)&key, sizeof(key)); |
| |
| auto* handle = _tablet_version_cache->lookup(cache_key); |
| if (handle == nullptr) { |
| return -1; |
| } |
| int64_t res = ((CacheValue*)_tablet_version_cache->value(handle))->value; |
| _tablet_version_cache->release(handle); |
| return res; |
| } |
| |
| void TxnManager::update_tablet_version_txn(int64_t tablet_id, int64_t version, int64_t txn_id) { |
| char key[16]; |
| memcpy(key, &tablet_id, sizeof(int64_t)); |
| memcpy(key + sizeof(int64_t), &version, sizeof(int64_t)); |
| CacheKey cache_key((const char*)&key, sizeof(key)); |
| |
| auto* value = new CacheValue; |
| value->value = txn_id; |
| auto* handle = _tablet_version_cache->insert(cache_key, value, 1, sizeof(txn_id), |
| CachePriority::NORMAL); |
| _tablet_version_cache->release(handle); |
| } |
| |
| TxnState TxnManager::get_txn_state(TPartitionId partition_id, TTransactionId transaction_id, |
| TTabletId tablet_id, TabletUid tablet_uid) { |
| pair<int64_t, int64_t> key(partition_id, transaction_id); |
| TabletInfo tablet_info(tablet_id, tablet_uid); |
| |
| std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id)); |
| |
| auto& txn_tablet_map = _get_txn_tablet_map(transaction_id); |
| auto it = txn_tablet_map.find(key); |
| if (it == txn_tablet_map.end()) { |
| return TxnState::NOT_FOUND; |
| } |
| |
| auto& tablet_txn_info_map = it->second; |
| auto tablet_txn_info_iter = tablet_txn_info_map.find(tablet_info); |
| if (tablet_txn_info_iter == tablet_txn_info_map.end()) { |
| return TxnState::NOT_FOUND; |
| } |
| |
| const auto& txn_info = tablet_txn_info_iter->second; |
| return txn_info->state; |
| } |
| |
| } // namespace doris |