| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "agent/task_worker_pool.h" |
| |
| #include <brpc/controller.h> |
| #include <fmt/format.h> |
| #include <gen_cpp/AgentService_types.h> |
| #include <gen_cpp/DataSinks_types.h> |
| #include <gen_cpp/HeartbeatService_types.h> |
| #include <gen_cpp/MasterService_types.h> |
| #include <gen_cpp/Status_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <unistd.h> |
| |
| #include <algorithm> |
| // IWYU pragma: no_include <bits/chrono.h> |
| #include <thrift/protocol/TDebugProtocol.h> |
| |
| #include <atomic> |
| #include <chrono> // IWYU pragma: keep |
| #include <ctime> |
| #include <functional> |
| #include <memory> |
| #include <mutex> |
| #include <shared_mutex> |
| #include <sstream> |
| #include <string> |
| #include <thread> |
| #include <type_traits> |
| #include <utility> |
| #include <vector> |
| |
| #include "agent/utils.h" |
| #include "cloud/cloud_delete_task.h" |
| #include "cloud/cloud_engine_calc_delete_bitmap_task.h" |
| #include "cloud/cloud_schema_change_job.h" |
| #include "cloud/cloud_snapshot_loader.h" |
| #include "cloud/cloud_snapshot_mgr.h" |
| #include "cloud/cloud_tablet_mgr.h" |
| #include "cloud/config.h" |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "io/fs/file_system.h" |
| #include "io/fs/hdfs_file_system.h" |
| #include "io/fs/local_file_system.h" |
| #include "io/fs/obj_storage_client.h" |
| #include "io/fs/path.h" |
| #include "io/fs/remote_file_system.h" |
| #include "io/fs/s3_file_system.h" |
| #include "olap/cumulative_compaction_time_series_policy.h" |
| #include "olap/data_dir.h" |
| #include "olap/olap_common.h" |
| #include "olap/rowset/rowset_meta.h" |
| #include "olap/snapshot_manager.h" |
| #include "olap/storage_engine.h" |
| #include "olap/storage_policy.h" |
| #include "olap/tablet.h" |
| #include "olap/tablet_manager.h" |
| #include "olap/tablet_meta.h" |
| #include "olap/tablet_schema.h" |
| #include "olap/task/engine_batch_load_task.h" |
| #include "olap/task/engine_checksum_task.h" |
| #include "olap/task/engine_clone_task.h" |
| #include "olap/task/engine_cloud_index_change_task.h" |
| #include "olap/task/engine_index_change_task.h" |
| #include "olap/task/engine_publish_version_task.h" |
| #include "olap/task/engine_storage_migration_task.h" |
| #include "olap/txn_manager.h" |
| #include "olap/utils.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/fragment_mgr.h" |
| #include "runtime/index_policy/index_policy_mgr.h" |
| #include "runtime/memory/global_memory_arbitrator.h" |
| #include "runtime/snapshot_loader.h" |
| #include "service/backend_options.h" |
| #include "util/brpc_client_cache.h" |
| #include "util/debug_points.h" |
| #include "util/doris_metrics.h" |
| #include "util/jni-util.h" |
| #include "util/mem_info.h" |
| #include "util/random.h" |
| #include "util/s3_util.h" |
| #include "util/stopwatch.hpp" |
| #include "util/threadpool.h" |
| #include "util/time.h" |
| #include "util/trace.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| using namespace ErrorCode; |
| |
| namespace { |
| |
| std::mutex s_task_signatures_mtx; |
| std::unordered_map<TTaskType::type, std::unordered_set<int64_t>> s_task_signatures; |
| |
| std::atomic_ulong s_report_version(time(nullptr) * 100000); |
| |
| void increase_report_version() { |
| s_report_version.fetch_add(1, std::memory_order_relaxed); |
| } |
| |
| // FIXME(plat1ko): Paired register and remove task info |
| bool register_task_info(const TTaskType::type task_type, int64_t signature) { |
| if (task_type == TTaskType::type::PUSH_STORAGE_POLICY || |
| task_type == TTaskType::type::PUSH_COOLDOWN_CONF || |
| task_type == TTaskType::type::COMPACTION) { |
| // no need to report task of these types |
| return true; |
| } |
| |
| if (signature == -1) { // No need to report task with unintialized signature |
| return true; |
| } |
| |
| std::lock_guard lock(s_task_signatures_mtx); |
| auto& set = s_task_signatures[task_type]; |
| return set.insert(signature).second; |
| } |
| |
| void remove_task_info(const TTaskType::type task_type, int64_t signature) { |
| size_t queue_size; |
| { |
| std::lock_guard lock(s_task_signatures_mtx); |
| auto& set = s_task_signatures[task_type]; |
| set.erase(signature); |
| queue_size = set.size(); |
| } |
| |
| VLOG_NOTICE << "remove task info. type=" << task_type << ", signature=" << signature |
| << ", queue_size=" << queue_size; |
| } |
| |
| void finish_task(const TFinishTaskRequest& finish_task_request) { |
| // Return result to FE |
| TMasterResult result; |
| uint32_t try_time = 0; |
| constexpr int TASK_FINISH_MAX_RETRY = 3; |
| while (try_time < TASK_FINISH_MAX_RETRY) { |
| DorisMetrics::instance()->finish_task_requests_total->increment(1); |
| Status client_status = |
| MasterServerClient::instance()->finish_task(finish_task_request, &result); |
| |
| if (client_status.ok()) { |
| break; |
| } else { |
| DorisMetrics::instance()->finish_task_requests_failed->increment(1); |
| LOG_WARNING("failed to finish task") |
| .tag("type", finish_task_request.task_type) |
| .tag("signature", finish_task_request.signature) |
| .error(result.status); |
| try_time += 1; |
| } |
| sleep(1); |
| } |
| } |
| |
| Status get_tablet_info(StorageEngine& engine, const TTabletId tablet_id, |
| const TSchemaHash schema_hash, TTabletInfo* tablet_info) { |
| tablet_info->__set_tablet_id(tablet_id); |
| tablet_info->__set_schema_hash(schema_hash); |
| return engine.tablet_manager()->report_tablet_info(tablet_info); |
| } |
| |
| void random_sleep(int second) { |
| Random rnd(static_cast<uint32_t>(UnixMillis())); |
| sleep(rnd.Uniform(second) + 1); |
| } |
| |
| void alter_tablet(StorageEngine& engine, const TAgentTaskRequest& agent_task_req, int64_t signature, |
| const TTaskType::type task_type, TFinishTaskRequest* finish_task_request) { |
| Status status; |
| |
| std::string_view process_name = "alter tablet"; |
| // Check last schema change status, if failed delete tablet file |
| // Do not need to adjust delete success or not |
| // Because if delete failed create rollup will failed |
| TTabletId new_tablet_id = 0; |
| TSchemaHash new_schema_hash = 0; |
| if (status.ok()) { |
| new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; |
| new_schema_hash = agent_task_req.alter_tablet_req_v2.new_schema_hash; |
| auto mem_tracker = MemTrackerLimiter::create_shared( |
| MemTrackerLimiter::Type::SCHEMA_CHANGE, |
| fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", |
| std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id), |
| std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id), |
| engine.memory_limitation_bytes_per_thread_for_schema_change())); |
| SCOPED_ATTACH_TASK(mem_tracker); |
| DorisMetrics::instance()->create_rollup_requests_total->increment(1); |
| Status res = Status::OK(); |
| try { |
| LOG_INFO("start {}", process_name) |
| .tag("signature", agent_task_req.signature) |
| .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
| .tag("new_tablet_id", new_tablet_id) |
| .tag("mem_limit", |
| engine.memory_limitation_bytes_per_thread_for_schema_change()); |
| SchemaChangeJob job(engine, agent_task_req.alter_tablet_req_v2, |
| std::to_string(agent_task_req.alter_tablet_req_v2.__isset.job_id |
| ? agent_task_req.alter_tablet_req_v2.job_id |
| : 0)); |
| status = job.process_alter_tablet(agent_task_req.alter_tablet_req_v2); |
| } catch (const Exception& e) { |
| status = e.to_status(); |
| } |
| if (!status.ok()) { |
| DorisMetrics::instance()->create_rollup_requests_failed->increment(1); |
| } |
| } |
| |
| if (status.ok()) { |
| increase_report_version(); |
| } |
| |
| // Return result to fe |
| finish_task_request->__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request->__set_report_version(s_report_version); |
| finish_task_request->__set_task_type(task_type); |
| finish_task_request->__set_signature(signature); |
| |
| std::vector<TTabletInfo> finish_tablet_infos; |
| if (status.ok()) { |
| TTabletInfo tablet_info; |
| status = get_tablet_info(engine, new_tablet_id, new_schema_hash, &tablet_info); |
| if (status.ok()) { |
| finish_tablet_infos.push_back(tablet_info); |
| } |
| } |
| |
| if (!status.ok() && !status.is<NOT_IMPLEMENTED_ERROR>()) { |
| LOG_WARNING("failed to {}", process_name) |
| .tag("signature", agent_task_req.signature) |
| .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
| .tag("new_tablet_id", new_tablet_id) |
| .error(status); |
| } else { |
| finish_task_request->__set_finish_tablet_infos(finish_tablet_infos); |
| LOG_INFO("successfully {}", process_name) |
| .tag("signature", agent_task_req.signature) |
| .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
| .tag("new_tablet_id", new_tablet_id); |
| } |
| finish_task_request->__set_task_status(status.to_thrift()); |
| } |
| |
| void alter_cloud_tablet(CloudStorageEngine& engine, const TAgentTaskRequest& agent_task_req, |
| int64_t signature, const TTaskType::type task_type, |
| TFinishTaskRequest* finish_task_request) { |
| Status status; |
| |
| std::string_view process_name = "alter tablet"; |
| // Check last schema change status, if failed delete tablet file |
| // Do not need to adjust delete success or not |
| // Because if delete failed create rollup will failed |
| TTabletId new_tablet_id = 0; |
| new_tablet_id = agent_task_req.alter_tablet_req_v2.new_tablet_id; |
| auto mem_tracker = MemTrackerLimiter::create_shared( |
| MemTrackerLimiter::Type::SCHEMA_CHANGE, |
| fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}", |
| std::to_string(agent_task_req.alter_tablet_req_v2.base_tablet_id), |
| std::to_string(agent_task_req.alter_tablet_req_v2.new_tablet_id), |
| engine.memory_limitation_bytes_per_thread_for_schema_change())); |
| SCOPED_ATTACH_TASK(mem_tracker); |
| DorisMetrics::instance()->create_rollup_requests_total->increment(1); |
| |
| LOG_INFO("start {}", process_name) |
| .tag("signature", agent_task_req.signature) |
| .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
| .tag("new_tablet_id", new_tablet_id) |
| .tag("mem_limit", engine.memory_limitation_bytes_per_thread_for_schema_change()); |
| DCHECK(agent_task_req.alter_tablet_req_v2.__isset.job_id); |
| CloudSchemaChangeJob job(engine, std::to_string(agent_task_req.alter_tablet_req_v2.job_id), |
| agent_task_req.alter_tablet_req_v2.expiration); |
| status = [&]() { |
| HANDLE_EXCEPTION_IF_CATCH_EXCEPTION( |
| job.process_alter_tablet(agent_task_req.alter_tablet_req_v2), |
| [&](const doris::Exception& ex) { |
| DorisMetrics::instance()->create_rollup_requests_failed->increment(1); |
| job.clean_up_on_failure(); |
| }); |
| return Status::OK(); |
| }(); |
| |
| if (status.ok()) { |
| increase_report_version(); |
| LOG_INFO("successfully {}", process_name) |
| .tag("signature", agent_task_req.signature) |
| .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
| .tag("new_tablet_id", new_tablet_id); |
| } else { |
| LOG_WARNING("failed to {}", process_name) |
| .tag("signature", agent_task_req.signature) |
| .tag("base_tablet_id", agent_task_req.alter_tablet_req_v2.base_tablet_id) |
| .tag("new_tablet_id", new_tablet_id) |
| .error(status); |
| } |
| |
| // Return result to fe |
| finish_task_request->__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request->__set_report_version(s_report_version); |
| finish_task_request->__set_task_type(task_type); |
| finish_task_request->__set_signature(signature); |
| finish_task_request->__set_task_status(status.to_thrift()); |
| } |
| |
| Status check_migrate_request(StorageEngine& engine, const TStorageMediumMigrateReq& req, |
| TabletSharedPtr& tablet, DataDir** dest_store) { |
| int64_t tablet_id = req.tablet_id; |
| tablet = engine.tablet_manager()->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| return Status::InternalError("could not find tablet {}", tablet_id); |
| } |
| |
| if (req.__isset.data_dir) { |
| // request specify the data dir |
| *dest_store = engine.get_store(req.data_dir); |
| if (*dest_store == nullptr) { |
| return Status::InternalError("could not find data dir {}", req.data_dir); |
| } |
| } else { |
| // this is a storage medium |
| // get data dir by storage medium |
| |
| // judge case when no need to migrate |
| uint32_t count = engine.available_storage_medium_type_count(); |
| if (count <= 1) { |
| return Status::InternalError("available storage medium type count is less than 1"); |
| } |
| // check current tablet storage medium |
| TStorageMedium::type storage_medium = req.storage_medium; |
| TStorageMedium::type src_storage_medium = tablet->data_dir()->storage_medium(); |
| if (src_storage_medium == storage_medium) { |
| return Status::InternalError("tablet is already on specified storage medium {}", |
| storage_medium); |
| } |
| // get a random store of specified storage medium |
| auto stores = engine.get_stores_for_create_tablet(tablet->partition_id(), storage_medium); |
| if (stores.empty()) { |
| return Status::InternalError("failed to get root path for create tablet"); |
| } |
| |
| *dest_store = stores[0]; |
| } |
| if (tablet->data_dir()->path() == (*dest_store)->path()) { |
| LOG_WARNING("tablet is already on specified path").tag("path", tablet->data_dir()->path()); |
| return Status::Error<FILE_ALREADY_EXIST, false>("tablet is already on specified path: {}", |
| tablet->data_dir()->path()); |
| } |
| |
| // check local disk capacity |
| int64_t tablet_size = tablet->tablet_local_size(); |
| if ((*dest_store)->reach_capacity_limit(tablet_size)) { |
| return Status::Error<EXCEEDED_LIMIT>("reach the capacity limit of path {}, tablet_size={}", |
| (*dest_store)->path(), tablet_size); |
| } |
| return Status::OK(); |
| } |
| |
| // Return `true` if report success |
| bool handle_report(const TReportRequest& request, const ClusterInfo* cluster_info, |
| std::string_view name) { |
| TMasterResult result; |
| Status status = MasterServerClient::instance()->report(request, &result); |
| if (!status.ok()) [[unlikely]] { |
| LOG_WARNING("failed to report {}", name) |
| .tag("host", cluster_info->master_fe_addr.hostname) |
| .tag("port", cluster_info->master_fe_addr.port) |
| .error(status); |
| return false; |
| } |
| |
| else if (result.status.status_code != TStatusCode::OK) [[unlikely]] { |
| LOG_WARNING("failed to report {}", name) |
| .tag("host", cluster_info->master_fe_addr.hostname) |
| .tag("port", cluster_info->master_fe_addr.port) |
| .error(result.status); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| Status _submit_task(const TAgentTaskRequest& task, |
| std::function<Status(const TAgentTaskRequest&)> submit_op) { |
| const TTaskType::type task_type = task.task_type; |
| int64_t signature = task.signature; |
| |
| std::string type_str; |
| EnumToString(TTaskType, task_type, type_str); |
| VLOG_CRITICAL << "submitting task. type=" << type_str << ", signature=" << signature; |
| |
| if (!register_task_info(task_type, signature)) { |
| LOG_WARNING("failed to register task").tag("type", type_str).tag("signature", signature); |
| // Duplicated task request, just return OK |
| return Status::OK(); |
| } |
| |
| // TODO(plat1ko): check task request member |
| |
| // Set the receiving time of task so that we can determine whether it is timed out later |
| // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService |
| // use the arg BackendService_submit_tasks_args.tasks is not const, so modify is ok |
| (const_cast<TAgentTaskRequest&>(task)).__set_recv_time(time(nullptr)); |
| auto st = submit_op(task); |
| if (!st.ok()) [[unlikely]] { |
| LOG_INFO("failed to submit task").tag("type", type_str).tag("signature", signature); |
| return st; |
| } |
| |
| LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); |
| return Status::OK(); |
| } |
| |
| bvar::LatencyRecorder g_publish_version_latency("doris_pk", "publish_version"); |
| |
| bvar::Adder<uint64_t> ALTER_INVERTED_INDEX_count("task", "ALTER_INVERTED_INDEX"); |
| bvar::Adder<uint64_t> CHECK_CONSISTENCY_count("task", "CHECK_CONSISTENCY"); |
| bvar::Adder<uint64_t> UPLOAD_count("task", "UPLOAD"); |
| bvar::Adder<uint64_t> DOWNLOAD_count("task", "DOWNLOAD"); |
| bvar::Adder<uint64_t> MAKE_SNAPSHOT_count("task", "MAKE_SNAPSHOT"); |
| bvar::Adder<uint64_t> RELEASE_SNAPSHOT_count("task", "RELEASE_SNAPSHOT"); |
| bvar::Adder<uint64_t> MOVE_count("task", "MOVE"); |
| bvar::Adder<uint64_t> COMPACTION_count("task", "COMPACTION"); |
| bvar::Adder<uint64_t> PUSH_STORAGE_POLICY_count("task", "PUSH_STORAGE_POLICY"); |
| bvar::Adder<uint64_t> PUSH_INDEX_POLICY_count("task", "PUSH_INDEX_POLICY"); |
| bvar::Adder<uint64_t> PUSH_COOLDOWN_CONF_count("task", "PUSH_COOLDOWN_CONF"); |
| bvar::Adder<uint64_t> CREATE_count("task", "CREATE_TABLE"); |
| bvar::Adder<uint64_t> DROP_count("task", "DROP_TABLE"); |
| bvar::Adder<uint64_t> PUBLISH_VERSION_count("task", "PUBLISH_VERSION"); |
| bvar::Adder<uint64_t> CLEAR_TRANSACTION_TASK_count("task", "CLEAR_TRANSACTION_TASK"); |
| bvar::Adder<uint64_t> DELETE_count("task", "DELETE"); |
| bvar::Adder<uint64_t> PUSH_count("task", "PUSH"); |
| bvar::Adder<uint64_t> UPDATE_TABLET_META_INFO_count("task", "UPDATE_TABLET_META_INFO"); |
| bvar::Adder<uint64_t> ALTER_count("task", "ALTER_TABLE"); |
| bvar::Adder<uint64_t> CLONE_count("task", "CLONE"); |
| bvar::Adder<uint64_t> STORAGE_MEDIUM_MIGRATE_count("task", "STORAGE_MEDIUM_MIGRATE"); |
| bvar::Adder<uint64_t> GC_BINLOG_count("task", "GC_BINLOG"); |
| bvar::Adder<uint64_t> UPDATE_VISIBLE_VERSION_count("task", "UPDATE_VISIBLE_VERSION"); |
| bvar::Adder<uint64_t> CALCULATE_DELETE_BITMAP_count("task", "CALCULATE_DELETE_BITMAP"); |
| |
| void add_task_count(const TAgentTaskRequest& task, int n) { |
| // clang-format off |
| switch (task.task_type) { |
| #define ADD_TASK_COUNT(type) \ |
| case TTaskType::type: \ |
| type##_count << n; \ |
| return; |
| ADD_TASK_COUNT(ALTER_INVERTED_INDEX) |
| ADD_TASK_COUNT(CHECK_CONSISTENCY) |
| ADD_TASK_COUNT(UPLOAD) |
| ADD_TASK_COUNT(DOWNLOAD) |
| ADD_TASK_COUNT(MAKE_SNAPSHOT) |
| ADD_TASK_COUNT(RELEASE_SNAPSHOT) |
| ADD_TASK_COUNT(MOVE) |
| ADD_TASK_COUNT(COMPACTION) |
| ADD_TASK_COUNT(PUSH_STORAGE_POLICY) |
| ADD_TASK_COUNT(PUSH_INDEX_POLICY) |
| ADD_TASK_COUNT(PUSH_COOLDOWN_CONF) |
| ADD_TASK_COUNT(CREATE) |
| ADD_TASK_COUNT(DROP) |
| ADD_TASK_COUNT(PUBLISH_VERSION) |
| ADD_TASK_COUNT(CLEAR_TRANSACTION_TASK) |
| ADD_TASK_COUNT(UPDATE_TABLET_META_INFO) |
| ADD_TASK_COUNT(CLONE) |
| ADD_TASK_COUNT(STORAGE_MEDIUM_MIGRATE) |
| ADD_TASK_COUNT(GC_BINLOG) |
| ADD_TASK_COUNT(UPDATE_VISIBLE_VERSION) |
| ADD_TASK_COUNT(CALCULATE_DELETE_BITMAP) |
| #undef ADD_TASK_COUNT |
| case TTaskType::REALTIME_PUSH: |
| case TTaskType::PUSH: |
| if (task.push_req.push_type == TPushType::LOAD_V2) { |
| PUSH_count << n; |
| } else if (task.push_req.push_type == TPushType::DELETE) { |
| DELETE_count << n; |
| } |
| return; |
| case TTaskType::ALTER: |
| { |
| ALTER_count << n; |
| // cloud auto stop need sc jobs, a tablet's sc can also be considered a fragment |
| if (n > 0) { |
| // only count fragment when task is actually starting |
| doris::g_fragment_executing_count << 1; |
| int64_t now = duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count(); |
| g_fragment_last_active_time.set_value(now); |
| } |
| return; |
| } |
| default: |
| return; |
| } |
| // clang-format on |
| } |
| |
| bvar::Adder<uint64_t> report_task_total("report", "task_total"); |
| bvar::Adder<uint64_t> report_task_failed("report", "task_failed"); |
| bvar::Adder<uint64_t> report_disk_total("report", "disk_total"); |
| bvar::Adder<uint64_t> report_disk_failed("report", "disk_failed"); |
| bvar::Adder<uint64_t> report_tablet_total("report", "tablet_total"); |
| bvar::Adder<uint64_t> report_tablet_failed("report", "tablet_failed"); |
| bvar::Adder<uint64_t> report_index_policy_total("report", "index_policy_total"); |
| bvar::Adder<uint64_t> report_index_policy_failed("report", "index_policy_failed"); |
| |
| } // namespace |
| |
| TaskWorkerPool::TaskWorkerPool(std::string_view name, int worker_count, |
| std::function<void(const TAgentTaskRequest& task)> callback) |
| : _callback(std::move(callback)) { |
| auto st = ThreadPoolBuilder(fmt::format("TaskWP_{}", name)) |
| .set_min_threads(worker_count) |
| .set_max_threads(worker_count) |
| .build(&_thread_pool); |
| CHECK(st.ok()) << name << ": " << st; |
| } |
| |
| TaskWorkerPool::~TaskWorkerPool() { |
| stop(); |
| } |
| |
| void TaskWorkerPool::stop() { |
| if (_stopped.exchange(true)) { |
| return; |
| } |
| |
| if (_thread_pool) { |
| _thread_pool->shutdown(); |
| } |
| } |
| |
| Status TaskWorkerPool::submit_task(const TAgentTaskRequest& task) { |
| return _submit_task(task, [this](auto&& task) { |
| add_task_count(task, 1); |
| return _thread_pool->submit_func([this, task]() { |
| _callback(task); |
| add_task_count(task, -1); |
| }); |
| }); |
| } |
| |
| PriorTaskWorkerPool::PriorTaskWorkerPool( |
| const std::string& name, int normal_worker_count, int high_prior_worker_count, |
| std::function<void(const TAgentTaskRequest& task)> callback) |
| : _callback(std::move(callback)) { |
| for (int i = 0; i < normal_worker_count; ++i) { |
| auto st = Thread::create( |
| "Normal", name, [this] { normal_loop(); }, &_workers.emplace_back()); |
| CHECK(st.ok()) << name << ": " << st; |
| } |
| |
| for (int i = 0; i < high_prior_worker_count; ++i) { |
| auto st = Thread::create( |
| "HighPrior", name, [this] { high_prior_loop(); }, &_workers.emplace_back()); |
| CHECK(st.ok()) << name << ": " << st; |
| } |
| } |
| |
| PriorTaskWorkerPool::~PriorTaskWorkerPool() { |
| stop(); |
| } |
| |
| void PriorTaskWorkerPool::stop() { |
| { |
| std::lock_guard lock(_mtx); |
| if (_stopped) { |
| return; |
| } |
| |
| _stopped = true; |
| } |
| _normal_condv.notify_all(); |
| _high_prior_condv.notify_all(); |
| |
| for (auto&& w : _workers) { |
| if (w) { |
| w->join(); |
| } |
| } |
| } |
| |
| Status PriorTaskWorkerPool::submit_task(const TAgentTaskRequest& task) { |
| return _submit_task(task, [this](auto&& task) { |
| auto req = std::make_unique<TAgentTaskRequest>(task); |
| add_task_count(*req, 1); |
| if (req->__isset.priority && req->priority == TPriority::HIGH) { |
| std::lock_guard lock(_mtx); |
| _high_prior_queue.push_back(std::move(req)); |
| _high_prior_condv.notify_one(); |
| _normal_condv.notify_one(); |
| } else { |
| std::lock_guard lock(_mtx); |
| _normal_queue.push_back(std::move(req)); |
| _normal_condv.notify_one(); |
| } |
| return Status::OK(); |
| }); |
| } |
| |
| Status PriorTaskWorkerPool::submit_high_prior_and_cancel_low(TAgentTaskRequest& task) { |
| const TTaskType::type task_type = task.task_type; |
| int64_t signature = task.signature; |
| std::string type_str; |
| EnumToString(TTaskType, task_type, type_str); |
| auto req = std::make_unique<TAgentTaskRequest>(task); |
| |
| DCHECK(req->__isset.priority && req->priority == TPriority::HIGH); |
| do { |
| std::lock_guard lock(s_task_signatures_mtx); |
| auto& set = s_task_signatures[task_type]; |
| if (!set.contains(signature)) { |
| // If it doesn't exist, put it directly into the priority queue |
| add_task_count(*req, 1); |
| set.insert(signature); |
| std::lock_guard temp_lock(_mtx); |
| _high_prior_queue.push_back(std::move(req)); |
| _high_prior_condv.notify_one(); |
| _normal_condv.notify_one(); |
| break; |
| } else { |
| std::lock_guard temp_lock(_mtx); |
| for (auto it = _normal_queue.begin(); it != _normal_queue.end();) { |
| // If it exists in the normal queue, cancel the task in the normal queue |
| if ((*it)->signature == signature) { |
| _normal_queue.erase(it); // cancel the original task |
| _high_prior_queue.push_back(std::move(req)); // add the new task to the queue |
| _high_prior_condv.notify_one(); |
| _normal_condv.notify_one(); |
| break; |
| } else { |
| ++it; // doesn't meet the condition, continue to the next one |
| } |
| } |
| // If it exists in the high priority queue, no operation is needed |
| LOG_INFO("task has already existed in high prior queue.").tag("signature", signature); |
| } |
| } while (false); |
| |
| // Set the receiving time of task so that we can determine whether it is timed out later |
| task.__set_recv_time(time(nullptr)); |
| |
| LOG_INFO("successfully submit task").tag("type", type_str).tag("signature", signature); |
| return Status::OK(); |
| } |
| |
| void PriorTaskWorkerPool::normal_loop() { |
| while (true) { |
| std::unique_ptr<TAgentTaskRequest> req; |
| |
| { |
| std::unique_lock lock(_mtx); |
| _normal_condv.wait(lock, [&] { |
| return !_normal_queue.empty() || !_high_prior_queue.empty() || _stopped; |
| }); |
| |
| if (_stopped) { |
| return; |
| } |
| |
| if (!_high_prior_queue.empty()) { |
| req = std::move(_high_prior_queue.front()); |
| _high_prior_queue.pop_front(); |
| } else if (!_normal_queue.empty()) { |
| req = std::move(_normal_queue.front()); |
| _normal_queue.pop_front(); |
| } else { |
| continue; |
| } |
| } |
| |
| _callback(*req); |
| add_task_count(*req, -1); |
| } |
| } |
| |
| void PriorTaskWorkerPool::high_prior_loop() { |
| while (true) { |
| std::unique_ptr<TAgentTaskRequest> req; |
| |
| { |
| std::unique_lock lock(_mtx); |
| _high_prior_condv.wait(lock, [&] { return !_high_prior_queue.empty() || _stopped; }); |
| |
| if (_stopped) { |
| return; |
| } |
| |
| if (_high_prior_queue.empty()) { |
| continue; |
| } |
| |
| req = std::move(_high_prior_queue.front()); |
| _high_prior_queue.pop_front(); |
| } |
| |
| _callback(*req); |
| add_task_count(*req, -1); |
| } |
| } |
| |
| ReportWorker::ReportWorker(std::string name, const ClusterInfo* cluster_info, int report_interval_s, |
| std::function<void()> callback) |
| : _name(std::move(name)) { |
| auto report_loop = [this, cluster_info, report_interval_s, callback = std::move(callback)] { |
| auto& engine = ExecEnv::GetInstance()->storage_engine(); |
| engine.register_report_listener(this); |
| while (true) { |
| { |
| std::unique_lock lock(_mtx); |
| _condv.wait_for(lock, std::chrono::seconds(report_interval_s), |
| [&] { return _stopped || _signal; }); |
| |
| if (_stopped) { |
| break; |
| } |
| |
| if (_signal) { |
| // Consume received signal |
| _signal = false; |
| } |
| } |
| |
| if (cluster_info->master_fe_addr.port == 0) { |
| // port == 0 means not received heartbeat yet |
| LOG(INFO) << "waiting to receive first heartbeat from frontend before doing report"; |
| continue; |
| } |
| |
| callback(); |
| } |
| engine.deregister_report_listener(this); |
| }; |
| |
| auto st = Thread::create("ReportWorker", _name, report_loop, &_thread); |
| CHECK(st.ok()) << _name << ": " << st; |
| } |
| |
| ReportWorker::~ReportWorker() { |
| stop(); |
| } |
| |
| void ReportWorker::notify() { |
| { |
| std::lock_guard lock(_mtx); |
| _signal = true; |
| } |
| _condv.notify_all(); |
| } |
| |
| void ReportWorker::stop() { |
| { |
| std::lock_guard lock(_mtx); |
| if (_stopped) { |
| return; |
| } |
| |
| _stopped = true; |
| } |
| _condv.notify_all(); |
| if (_thread) { |
| _thread->join(); |
| } |
| } |
| |
| void alter_cloud_index_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& alter_inverted_index_rq = req.alter_inverted_index_req; |
| LOG(INFO) << "[index_change]get alter index task. signature=" << req.signature |
| << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
| << ", job_id=" << alter_inverted_index_rq.job_id; |
| |
| Status status = Status::OK(); |
| auto tablet_ptr = engine.tablet_mgr().get_tablet(alter_inverted_index_rq.tablet_id); |
| if (tablet_ptr != nullptr) { |
| EngineCloudIndexChangeTask engine_task(engine, req.alter_inverted_index_req); |
| status = engine_task.execute(); |
| } else { |
| status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id); |
| } |
| |
| // Return result to fe |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| if (!status.ok()) { |
| LOG(WARNING) << "[index_change]failed to alter inverted index task, signature=" |
| << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
| << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status; |
| } else { |
| LOG(INFO) << "[index_change]successfully alter inverted index task, signature=" |
| << req.signature << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
| << ", job_id=" << alter_inverted_index_rq.job_id; |
| } |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void alter_inverted_index_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& alter_inverted_index_rq = req.alter_inverted_index_req; |
| LOG(INFO) << "get alter inverted index task. signature=" << req.signature |
| << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
| << ", job_id=" << alter_inverted_index_rq.job_id; |
| |
| Status status = Status::OK(); |
| auto tablet_ptr = engine.tablet_manager()->get_tablet(alter_inverted_index_rq.tablet_id); |
| if (tablet_ptr != nullptr) { |
| EngineIndexChangeTask engine_task(engine, alter_inverted_index_rq); |
| SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
| status = engine_task.execute(); |
| } else { |
| status = Status::NotFound("could not find tablet {}", alter_inverted_index_rq.tablet_id); |
| } |
| |
| // Return result to fe |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| std::vector<TTabletInfo> finish_tablet_infos; |
| if (!status.ok()) { |
| LOG(WARNING) << "failed to alter inverted index task, signature=" << req.signature |
| << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
| << ", job_id=" << alter_inverted_index_rq.job_id << ", error=" << status; |
| } else { |
| LOG(INFO) << "successfully alter inverted index task, signature=" << req.signature |
| << ", tablet_id=" << alter_inverted_index_rq.tablet_id |
| << ", job_id=" << alter_inverted_index_rq.job_id; |
| TTabletInfo tablet_info; |
| status = get_tablet_info(engine, alter_inverted_index_rq.tablet_id, |
| alter_inverted_index_rq.schema_hash, &tablet_info); |
| if (status.ok()) { |
| finish_tablet_infos.push_back(tablet_info); |
| } |
| finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); |
| } |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void update_tablet_meta_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| LOG(INFO) << "get update tablet meta task. signature=" << req.signature; |
| |
| Status status; |
| const auto& update_tablet_meta_req = req.update_tablet_meta_info_req; |
| for (const auto& tablet_meta_info : update_tablet_meta_req.tabletMetaInfos) { |
| auto tablet = engine.tablet_manager()->get_tablet(tablet_meta_info.tablet_id); |
| if (tablet == nullptr) { |
| status = Status::NotFound("tablet not found"); |
| LOG(WARNING) << "could not find tablet when update tablet meta. tablet_id=" |
| << tablet_meta_info.tablet_id; |
| continue; |
| } |
| bool need_to_save = false; |
| if (tablet_meta_info.__isset.partition_id) { |
| // for fix partition_id = 0 |
| LOG(WARNING) << "change be tablet id: " << tablet->tablet_meta()->tablet_id() |
| << "partition id from : " << tablet->tablet_meta()->partition_id() |
| << " to : " << tablet_meta_info.partition_id; |
| auto succ = engine.tablet_manager()->update_tablet_partition_id( |
| tablet_meta_info.partition_id, tablet->tablet_meta()->tablet_id()); |
| if (!succ) { |
| std::string err_msg = fmt::format( |
| "change be tablet id : {} partition_id : {} failed", |
| tablet->tablet_meta()->tablet_id(), tablet_meta_info.partition_id); |
| LOG(WARNING) << err_msg; |
| status = Status::InvalidArgument(err_msg); |
| continue; |
| } |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.storage_policy_id) { |
| tablet->tablet_meta()->set_storage_policy_id(tablet_meta_info.storage_policy_id); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.is_in_memory) { |
| tablet->tablet_meta()->mutable_tablet_schema()->set_is_in_memory( |
| tablet_meta_info.is_in_memory); |
| std::shared_lock rlock(tablet->get_header_lock()); |
| for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) { |
| rowset_meta->tablet_schema()->set_is_in_memory(tablet_meta_info.is_in_memory); |
| } |
| tablet->tablet_schema_unlocked()->set_is_in_memory(tablet_meta_info.is_in_memory); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.compaction_policy) { |
| if (tablet_meta_info.compaction_policy != CUMULATIVE_SIZE_BASED_POLICY && |
| tablet_meta_info.compaction_policy != CUMULATIVE_TIME_SERIES_POLICY) { |
| status = Status::InvalidArgument( |
| "invalid compaction policy, only support for size_based or " |
| "time_series"); |
| continue; |
| } |
| tablet->tablet_meta()->set_compaction_policy(tablet_meta_info.compaction_policy); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.time_series_compaction_goal_size_mbytes) { |
| if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
| status = Status::InvalidArgument( |
| "only time series compaction policy support time series config"); |
| continue; |
| } |
| tablet->tablet_meta()->set_time_series_compaction_goal_size_mbytes( |
| tablet_meta_info.time_series_compaction_goal_size_mbytes); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.time_series_compaction_file_count_threshold) { |
| if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
| status = Status::InvalidArgument( |
| "only time series compaction policy support time series config"); |
| continue; |
| } |
| tablet->tablet_meta()->set_time_series_compaction_file_count_threshold( |
| tablet_meta_info.time_series_compaction_file_count_threshold); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.time_series_compaction_time_threshold_seconds) { |
| if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
| status = Status::InvalidArgument( |
| "only time series compaction policy support time series config"); |
| continue; |
| } |
| tablet->tablet_meta()->set_time_series_compaction_time_threshold_seconds( |
| tablet_meta_info.time_series_compaction_time_threshold_seconds); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.time_series_compaction_empty_rowsets_threshold) { |
| if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
| status = Status::InvalidArgument( |
| "only time series compaction policy support time series config"); |
| continue; |
| } |
| tablet->tablet_meta()->set_time_series_compaction_empty_rowsets_threshold( |
| tablet_meta_info.time_series_compaction_empty_rowsets_threshold); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.time_series_compaction_level_threshold) { |
| if (tablet->tablet_meta()->compaction_policy() != CUMULATIVE_TIME_SERIES_POLICY) { |
| status = Status::InvalidArgument( |
| "only time series compaction policy support time series config"); |
| continue; |
| } |
| tablet->tablet_meta()->set_time_series_compaction_level_threshold( |
| tablet_meta_info.time_series_compaction_level_threshold); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.replica_id) { |
| tablet->tablet_meta()->set_replica_id(tablet_meta_info.replica_id); |
| } |
| if (tablet_meta_info.__isset.binlog_config) { |
| // check binlog_config require fields: enable, ttl_seconds, max_bytes, max_history_nums |
| const auto& t_binlog_config = tablet_meta_info.binlog_config; |
| if (!t_binlog_config.__isset.enable || !t_binlog_config.__isset.ttl_seconds || |
| !t_binlog_config.__isset.max_bytes || !t_binlog_config.__isset.max_history_nums) { |
| status = Status::InvalidArgument("invalid binlog config, some fields not set"); |
| LOG(WARNING) << fmt::format( |
| "invalid binlog config, some fields not set, tablet_id={}, " |
| "t_binlog_config={}", |
| tablet_meta_info.tablet_id, |
| apache::thrift::ThriftDebugString(t_binlog_config)); |
| continue; |
| } |
| |
| BinlogConfig new_binlog_config; |
| new_binlog_config = tablet_meta_info.binlog_config; |
| LOG(INFO) << fmt::format( |
| "update tablet meta binlog config. tablet_id={}, old_binlog_config={}, " |
| "new_binlog_config={}", |
| tablet_meta_info.tablet_id, tablet->tablet_meta()->binlog_config().to_string(), |
| new_binlog_config.to_string()); |
| tablet->set_binlog_config(new_binlog_config); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.enable_single_replica_compaction) { |
| std::shared_lock rlock(tablet->get_header_lock()); |
| tablet->tablet_meta()->mutable_tablet_schema()->set_enable_single_replica_compaction( |
| tablet_meta_info.enable_single_replica_compaction); |
| for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) { |
| rowset_meta->tablet_schema()->set_enable_single_replica_compaction( |
| tablet_meta_info.enable_single_replica_compaction); |
| } |
| tablet->tablet_schema_unlocked()->set_enable_single_replica_compaction( |
| tablet_meta_info.enable_single_replica_compaction); |
| need_to_save = true; |
| } |
| if (tablet_meta_info.__isset.disable_auto_compaction) { |
| std::shared_lock rlock(tablet->get_header_lock()); |
| tablet->tablet_meta()->mutable_tablet_schema()->set_disable_auto_compaction( |
| tablet_meta_info.disable_auto_compaction); |
| for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) { |
| rowset_meta->tablet_schema()->set_disable_auto_compaction( |
| tablet_meta_info.disable_auto_compaction); |
| } |
| tablet->tablet_schema_unlocked()->set_disable_auto_compaction( |
| tablet_meta_info.disable_auto_compaction); |
| need_to_save = true; |
| } |
| |
| if (tablet_meta_info.__isset.skip_write_index_on_load) { |
| std::shared_lock rlock(tablet->get_header_lock()); |
| tablet->tablet_meta()->mutable_tablet_schema()->set_skip_write_index_on_load( |
| tablet_meta_info.skip_write_index_on_load); |
| for (auto& [_, rowset_meta] : tablet->tablet_meta()->all_mutable_rs_metas()) { |
| rowset_meta->tablet_schema()->set_skip_write_index_on_load( |
| tablet_meta_info.skip_write_index_on_load); |
| } |
| tablet->tablet_schema_unlocked()->set_skip_write_index_on_load( |
| tablet_meta_info.skip_write_index_on_load); |
| need_to_save = true; |
| } |
| if (need_to_save) { |
| std::shared_lock rlock(tablet->get_header_lock()); |
| tablet->save_meta(); |
| } |
| } |
| |
| LOG(INFO) << "finish update tablet meta task. signature=" << req.signature; |
| if (req.signature != -1) { |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| } |
| |
| void check_consistency_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| uint32_t checksum = 0; |
| const auto& check_consistency_req = req.check_consistency_req; |
| EngineChecksumTask engine_task(engine, check_consistency_req.tablet_id, |
| check_consistency_req.schema_hash, check_consistency_req.version, |
| &checksum); |
| SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
| Status status = engine_task.execute(); |
| if (!status.ok()) { |
| LOG_WARNING("failed to check consistency") |
| .tag("signature", req.signature) |
| .tag("tablet_id", check_consistency_req.tablet_id) |
| .error(status); |
| } else { |
| LOG_INFO("successfully check consistency") |
| .tag("signature", req.signature) |
| .tag("tablet_id", check_consistency_req.tablet_id) |
| .tag("checksum", checksum); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task_request.__set_tablet_checksum(static_cast<int64_t>(checksum)); |
| finish_task_request.__set_request_version(check_consistency_req.version); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void report_task_callback(const ClusterInfo* cluster_info) { |
| TReportRequest request; |
| if (config::report_random_wait) { |
| random_sleep(5); |
| } |
| request.__isset.tasks = true; |
| { |
| std::lock_guard lock(s_task_signatures_mtx); |
| auto& tasks = request.tasks; |
| for (auto&& [task_type, signatures] : s_task_signatures) { |
| auto& set = tasks[task_type]; |
| for (auto&& signature : signatures) { |
| set.insert(signature); |
| } |
| } |
| } |
| request.__set_backend(BackendOptions::get_local_backend()); |
| request.__set_running_tasks(ExecEnv::GetInstance()->fragment_mgr()->running_query_num()); |
| bool succ = handle_report(request, cluster_info, "task"); |
| report_task_total << 1; |
| if (!succ) [[unlikely]] { |
| report_task_failed << 1; |
| } |
| } |
| |
| void report_disk_callback(StorageEngine& engine, const ClusterInfo* cluster_info) { |
| TReportRequest request; |
| request.__set_backend(BackendOptions::get_local_backend()); |
| request.__isset.disks = true; |
| |
| std::vector<DataDirInfo> data_dir_infos; |
| static_cast<void>(engine.get_all_data_dir_info(&data_dir_infos, true /* update */)); |
| |
| for (auto& root_path_info : data_dir_infos) { |
| TDisk disk; |
| disk.__set_root_path(root_path_info.path); |
| disk.__set_path_hash(root_path_info.path_hash); |
| disk.__set_storage_medium(root_path_info.storage_medium); |
| disk.__set_disk_total_capacity(root_path_info.disk_capacity); |
| disk.__set_data_used_capacity(root_path_info.local_used_capacity); |
| disk.__set_remote_used_capacity(root_path_info.remote_used_capacity); |
| disk.__set_disk_available_capacity(root_path_info.available); |
| disk.__set_trash_used_capacity(root_path_info.trash_used_capacity); |
| disk.__set_used(root_path_info.is_used); |
| request.disks[root_path_info.path] = disk; |
| } |
| request.__set_num_cores(CpuInfo::num_cores()); |
| request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 |
| ? config::pipeline_executor_size |
| : CpuInfo::num_cores()); |
| bool succ = handle_report(request, cluster_info, "disk"); |
| report_disk_total << 1; |
| if (!succ) [[unlikely]] { |
| report_disk_failed << 1; |
| } |
| } |
| |
| void report_disk_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) { |
| // Random sleep 1~5 seconds before doing report. |
| // In order to avoid the problem that the FE receives many report requests at the same time |
| // and can not be processed. |
| if (config::report_random_wait) { |
| random_sleep(5); |
| } |
| (void)engine; // To be used in the future |
| |
| TReportRequest request; |
| request.__set_backend(BackendOptions::get_local_backend()); |
| request.__isset.disks = true; |
| |
| // TODO(deardeng): report disk info in cloud mode. And make it more clear |
| // that report CPU by using a separte report procedure |
| // or abstracting disk report as "host info report" |
| request.__set_num_cores(CpuInfo::num_cores()); |
| request.__set_pipeline_executor_size(config::pipeline_executor_size > 0 |
| ? config::pipeline_executor_size |
| : CpuInfo::num_cores()); |
| bool succ = handle_report(request, cluster_info, "disk"); |
| report_disk_total << 1; |
| report_disk_failed << !succ; |
| } |
| |
| void report_tablet_callback(StorageEngine& engine, const ClusterInfo* cluster_info) { |
| if (config::report_random_wait) { |
| random_sleep(5); |
| } |
| |
| TReportRequest request; |
| request.__set_backend(BackendOptions::get_local_backend()); |
| request.__isset.tablets = true; |
| |
| increase_report_version(); |
| uint64_t report_version; |
| for (int i = 0; i < 5; i++) { |
| request.tablets.clear(); |
| report_version = s_report_version; |
| engine.tablet_manager()->build_all_report_tablets_info(&request.tablets); |
| if (report_version == s_report_version) { |
| break; |
| } |
| } |
| |
| if (report_version < s_report_version) { |
| // TODO llj This can only reduce the possibility for report error, but can't avoid it. |
| // If FE create a tablet in FE meta and send CREATE task to this BE, the tablet may not be included in this |
| // report, and the report version has a small probability that it has not been updated in time. When FE |
| // receives this report, it is possible to delete the new tablet. |
| LOG(WARNING) << "report version " << report_version << " change to " << s_report_version; |
| DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); |
| return; |
| } |
| |
| std::map<int64_t, int64_t> partitions_version; |
| engine.tablet_manager()->get_partitions_visible_version(&partitions_version); |
| request.__set_partitions_version(std::move(partitions_version)); |
| |
| int64_t max_compaction_score = |
| std::max(DorisMetrics::instance()->tablet_cumulative_max_compaction_score->value(), |
| DorisMetrics::instance()->tablet_base_max_compaction_score->value()); |
| request.__set_tablet_max_compaction_score(max_compaction_score); |
| request.__set_report_version(report_version); |
| |
| // report storage policy and resource |
| auto& storage_policy_list = request.storage_policy; |
| for (auto [id, version] : get_storage_policy_ids()) { |
| auto& storage_policy = storage_policy_list.emplace_back(); |
| storage_policy.__set_id(id); |
| storage_policy.__set_version(version); |
| } |
| request.__isset.storage_policy = true; |
| auto& resource_list = request.resource; |
| for (auto [id_str, version] : get_storage_resource_ids()) { |
| auto& resource = resource_list.emplace_back(); |
| int64_t id = -1; |
| if (auto [_, ec] = std::from_chars(id_str.data(), id_str.data() + id_str.size(), id); |
| ec != std::errc {}) [[unlikely]] { |
| LOG(ERROR) << "invalid resource id format: " << id_str; |
| } else { |
| resource.__set_id(id); |
| resource.__set_version(version); |
| } |
| } |
| request.__isset.resource = true; |
| |
| bool succ = handle_report(request, cluster_info, "tablet"); |
| report_tablet_total << 1; |
| if (!succ) [[unlikely]] { |
| report_tablet_failed << 1; |
| } |
| } |
| |
| void report_tablet_callback(CloudStorageEngine& engine, const ClusterInfo* cluster_info) { |
| // Random sleep 1~5 seconds before doing report. |
| // In order to avoid the problem that the FE receives many report requests at the same time |
| // and can not be processed. |
| if (config::report_random_wait) { |
| random_sleep(5); |
| } |
| |
| TReportRequest request; |
| request.__set_backend(BackendOptions::get_local_backend()); |
| request.__isset.tablets = true; |
| |
| increase_report_version(); |
| uint64_t report_version; |
| uint64_t total_num_tablets = 0; |
| for (int i = 0; i < 5; i++) { |
| request.tablets.clear(); |
| report_version = s_report_version; |
| engine.tablet_mgr().build_all_report_tablets_info(&request.tablets, &total_num_tablets); |
| if (report_version == s_report_version) { |
| break; |
| } |
| } |
| |
| if (report_version < s_report_version) { |
| LOG(WARNING) << "report version " << report_version << " change to " << s_report_version; |
| DorisMetrics::instance()->report_all_tablets_requests_skip->increment(1); |
| return; |
| } |
| |
| request.__set_report_version(report_version); |
| request.__set_num_tablets(total_num_tablets); |
| |
| bool succ = handle_report(request, cluster_info, "tablet"); |
| report_tablet_total << 1; |
| if (!succ) [[unlikely]] { |
| report_tablet_failed << 1; |
| } |
| } |
| |
| void upload_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
| const auto& upload_request = req.upload_req; |
| |
| LOG(INFO) << "get upload task. signature=" << req.signature |
| << ", job_id=" << upload_request.job_id; |
| |
| std::map<int64_t, std::vector<std::string>> tablet_files; |
| std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>( |
| engine, env, upload_request.job_id, req.signature, upload_request.broker_addr, |
| upload_request.broker_prop); |
| SCOPED_ATTACH_TASK(loader->resource_ctx()); |
| Status status = |
| loader->init(upload_request.__isset.storage_backend ? upload_request.storage_backend |
| : TStorageBackendType::type::BROKER, |
| upload_request.__isset.location ? upload_request.location : ""); |
| if (status.ok()) { |
| status = loader->upload(upload_request.src_dest_map, &tablet_files); |
| } |
| |
| if (!status.ok()) { |
| LOG_WARNING("failed to upload") |
| .tag("signature", req.signature) |
| .tag("job_id", upload_request.job_id) |
| .error(status); |
| } else { |
| LOG_INFO("successfully upload") |
| .tag("signature", req.signature) |
| .tag("job_id", upload_request.job_id); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task_request.__set_tablet_files(tablet_files); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void download_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
| const auto& download_request = req.download_req; |
| LOG(INFO) << "get download task. signature=" << req.signature |
| << ", job_id=" << download_request.job_id |
| << ", task detail: " << apache::thrift::ThriftDebugString(download_request); |
| |
| // TODO: download |
| std::vector<int64_t> downloaded_tablet_ids; |
| |
| auto status = Status::OK(); |
| if (download_request.__isset.remote_tablet_snapshots) { |
| std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>( |
| engine, env, download_request.job_id, req.signature); |
| status = loader->remote_http_download(download_request.remote_tablet_snapshots, |
| &downloaded_tablet_ids); |
| } else { |
| std::unique_ptr<SnapshotLoader> loader = std::make_unique<SnapshotLoader>( |
| engine, env, download_request.job_id, req.signature, download_request.broker_addr, |
| download_request.broker_prop); |
| status = loader->init(download_request.__isset.storage_backend |
| ? download_request.storage_backend |
| : TStorageBackendType::type::BROKER, |
| download_request.__isset.location ? download_request.location : ""); |
| if (status.ok()) { |
| status = loader->download(download_request.src_dest_map, &downloaded_tablet_ids); |
| } |
| } |
| |
| if (!status.ok()) { |
| LOG_WARNING("failed to download") |
| .tag("signature", req.signature) |
| .tag("job_id", download_request.job_id) |
| .error(status); |
| } else { |
| LOG_INFO("successfully download") |
| .tag("signature", req.signature) |
| .tag("job_id", download_request.job_id); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task_request.__set_downloaded_tablet_ids(downloaded_tablet_ids); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void download_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
| const auto& download_request = req.download_req; |
| LOG(INFO) << "get download task. signature=" << req.signature |
| << ", job_id=" << download_request.job_id |
| << ", task detail: " << apache::thrift::ThriftDebugString(download_request); |
| |
| std::vector<int64_t> transferred_tablet_ids; |
| |
| auto status = Status::OK(); |
| if (download_request.__isset.remote_tablet_snapshots) { |
| status = Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>( |
| "remote tablet snapshot is not supported."); |
| } else { |
| std::unique_ptr<CloudSnapshotLoader> loader = std::make_unique<CloudSnapshotLoader>( |
| engine, env, download_request.job_id, req.signature, download_request.broker_addr, |
| download_request.broker_prop); |
| status = loader->init(download_request.__isset.storage_backend |
| ? download_request.storage_backend |
| : TStorageBackendType::type::BROKER, |
| download_request.__isset.location ? download_request.location : "", |
| download_request.vault_id); |
| if (status.ok()) { |
| status = loader->download(download_request.src_dest_map, &transferred_tablet_ids); |
| } |
| |
| if (!status.ok()) { |
| LOG_WARNING("failed to download") |
| .tag("signature", req.signature) |
| .tag("job_id", download_request.job_id) |
| .error(status); |
| } else { |
| LOG_INFO("successfully download") |
| .tag("signature", req.signature) |
| .tag("job_id", download_request.job_id); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task_request.__set_downloaded_tablet_ids(transferred_tablet_ids); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| } |
| |
| void make_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& snapshot_request = req.snapshot_req; |
| |
| LOG(INFO) << "get snapshot task. signature=" << req.signature; |
| |
| std::string snapshot_path; |
| bool allow_incremental_clone = false; // not used |
| std::vector<std::string> snapshot_files; |
| Status status = engine.snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path, |
| &allow_incremental_clone); |
| if (status.ok() && snapshot_request.__isset.list_files) { |
| // list and save all snapshot files |
| // snapshot_path like: data/snapshot/20180417205230.1.86400 |
| // we need to add subdir: tablet_id/schema_hash/ |
| std::vector<io::FileInfo> files; |
| bool exists = true; |
| io::Path path = fmt::format("{}/{}/{}/", snapshot_path, snapshot_request.tablet_id, |
| snapshot_request.schema_hash); |
| status = io::global_local_filesystem()->list(path, true, &files, &exists); |
| if (status.ok()) { |
| for (auto& file : files) { |
| snapshot_files.push_back(file.file_name); |
| } |
| } |
| } |
| if (!status.ok()) { |
| LOG_WARNING("failed to make snapshot") |
| .tag("signature", req.signature) |
| .tag("tablet_id", snapshot_request.tablet_id) |
| .tag("version", snapshot_request.version) |
| .error(status); |
| } else { |
| LOG_INFO("successfully make snapshot") |
| .tag("signature", req.signature) |
| .tag("tablet_id", snapshot_request.tablet_id) |
| .tag("version", snapshot_request.version) |
| .tag("snapshot_path", snapshot_path); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_snapshot_path(snapshot_path); |
| finish_task_request.__set_snapshot_files(snapshot_files); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void release_snapshot_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& release_snapshot_request = req.release_snapshot_req; |
| |
| LOG(INFO) << "get release snapshot task. signature=" << req.signature; |
| |
| const std::string& snapshot_path = release_snapshot_request.snapshot_path; |
| Status status = engine.snapshot_mgr()->release_snapshot(snapshot_path); |
| if (!status.ok()) { |
| LOG_WARNING("failed to release snapshot") |
| .tag("signature", req.signature) |
| .tag("snapshot_path", snapshot_path) |
| .error(status); |
| } else { |
| LOG_INFO("successfully release snapshot") |
| .tag("signature", req.signature) |
| .tag("snapshot_path", snapshot_path); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void release_snapshot_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& release_snapshot_request = req.release_snapshot_req; |
| |
| LOG(INFO) << "get release snapshot task. signature=" << req.signature; |
| |
| Status status = engine.cloud_snapshot_mgr().release_snapshot( |
| release_snapshot_request.tablet_id, release_snapshot_request.is_job_completed); |
| |
| if (!status.ok()) { |
| LOG_WARNING("failed to release snapshot") |
| .tag("signature", req.signature) |
| .tag("tablet_id", release_snapshot_request.tablet_id) |
| .tag("is_job_completed", release_snapshot_request.is_job_completed) |
| .error(status); |
| } else { |
| LOG_INFO("successfully release snapshot") |
| .tag("signature", req.signature) |
| .tag("tablet_id", release_snapshot_request.tablet_id) |
| .tag("is_job_completed", release_snapshot_request.is_job_completed); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void move_dir_callback(StorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
| const auto& move_dir_req = req.move_dir_req; |
| |
| LOG(INFO) << "get move dir task. signature=" << req.signature |
| << ", job_id=" << move_dir_req.job_id; |
| Status status; |
| auto tablet = engine.tablet_manager()->get_tablet(move_dir_req.tablet_id); |
| if (tablet == nullptr) { |
| status = Status::InvalidArgument("Could not find tablet"); |
| } else { |
| SnapshotLoader loader(engine, env, move_dir_req.job_id, move_dir_req.tablet_id); |
| status = loader.move(move_dir_req.src, tablet, true); |
| } |
| |
| if (!status.ok()) { |
| LOG_WARNING("failed to move dir") |
| .tag("signature", req.signature) |
| .tag("job_id", move_dir_req.job_id) |
| .tag("tablet_id", move_dir_req.tablet_id) |
| .tag("src", move_dir_req.src) |
| .error(status); |
| } else { |
| LOG_INFO("successfully move dir") |
| .tag("signature", req.signature) |
| .tag("job_id", move_dir_req.job_id) |
| .tag("tablet_id", move_dir_req.tablet_id) |
| .tag("src", move_dir_req.src); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void move_dir_callback(CloudStorageEngine& engine, ExecEnv* env, const TAgentTaskRequest& req) { |
| const auto& move_dir_req = req.move_dir_req; |
| |
| LOG(INFO) << "get move dir task. signature=" << req.signature |
| << ", job_id=" << move_dir_req.job_id; |
| |
| Status status = engine.cloud_snapshot_mgr().commit_snapshot(move_dir_req.tablet_id); |
| if (!status.ok()) { |
| LOG_WARNING("failed to move dir") |
| .tag("signature", req.signature) |
| .tag("job_id", move_dir_req.job_id) |
| .tag("tablet_id", move_dir_req.tablet_id) |
| .error(status); |
| } else { |
| LOG_INFO("successfully move dir") |
| .tag("signature", req.signature) |
| .tag("job_id", move_dir_req.job_id) |
| .tag("tablet_id", move_dir_req.tablet_id); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void submit_table_compaction_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& compaction_req = req.compaction_req; |
| |
| LOG(INFO) << "get compaction task. signature=" << req.signature |
| << ", compaction_type=" << compaction_req.type; |
| |
| CompactionType compaction_type; |
| if (compaction_req.type == "base") { |
| compaction_type = CompactionType::BASE_COMPACTION; |
| } else { |
| compaction_type = CompactionType::CUMULATIVE_COMPACTION; |
| } |
| |
| auto tablet_ptr = engine.tablet_manager()->get_tablet(compaction_req.tablet_id); |
| if (tablet_ptr != nullptr) { |
| auto* data_dir = tablet_ptr->data_dir(); |
| if (!tablet_ptr->can_do_compaction(data_dir->path_hash(), compaction_type)) { |
| LOG(WARNING) << "could not do compaction. tablet_id=" << tablet_ptr->tablet_id() |
| << ", compaction_type=" << compaction_type; |
| return; |
| } |
| |
| Status status = engine.submit_compaction_task(tablet_ptr, compaction_type, false); |
| if (!status.ok()) { |
| LOG(WARNING) << "failed to submit table compaction task. error=" << status; |
| } |
| } |
| } |
| |
| namespace { |
| |
| void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) { |
| Status st; |
| io::RemoteFileSystemSPtr fs; |
| |
| if (!existed_fs) { |
| // No such FS instance on BE |
| auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param), |
| std::to_string(param.id)); |
| if (!res.has_value()) { |
| st = std::move(res).error(); |
| } else { |
| fs = std::move(res).value(); |
| } |
| } else { |
| DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name; |
| auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder(); |
| auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param); |
| S3ClientConf conf = std::move(new_s3_conf.client_conf); |
| st = client->reset(conf); |
| fs = std::move(existed_fs); |
| } |
| |
| if (!st.ok()) { |
| LOG(WARNING) << "update s3 resource failed: " << st; |
| } else { |
| LOG_INFO("successfully update s3 resource") |
| .tag("resource_id", param.id) |
| .tag("resource_name", param.name); |
| put_storage_resource(param.id, {std::move(fs)}, param.version); |
| } |
| } |
| |
| void update_hdfs_resource(const TStorageResource& param, io::RemoteFileSystemSPtr existed_fs) { |
| Status st; |
| io::RemoteFileSystemSPtr fs; |
| std::string root_path = |
| param.hdfs_storage_param.__isset.root_path ? param.hdfs_storage_param.root_path : ""; |
| |
| if (!existed_fs) { |
| // No such FS instance on BE |
| auto res = io::HdfsFileSystem::create( |
| param.hdfs_storage_param, param.hdfs_storage_param.fs_name, |
| std::to_string(param.id), nullptr, std::move(root_path)); |
| if (!res.has_value()) { |
| st = std::move(res).error(); |
| } else { |
| fs = std::move(res).value(); |
| } |
| |
| } else { |
| DCHECK_EQ(existed_fs->type(), io::FileSystemType::HDFS) << param.id << ' ' << param.name; |
| // TODO(plat1ko): update hdfs conf |
| fs = std::move(existed_fs); |
| } |
| |
| if (!st.ok()) { |
| LOG(WARNING) << "update hdfs resource failed: " << st; |
| } else { |
| LOG_INFO("successfully update hdfs resource") |
| .tag("resource_id", param.id) |
| .tag("resource_name", param.name) |
| .tag("root_path", fs->root_path().string()); |
| put_storage_resource(param.id, {std::move(fs)}, param.version); |
| } |
| } |
| |
| } // namespace |
| |
| void push_storage_policy_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& push_storage_policy_req = req.push_storage_policy_req; |
| // refresh resource |
| for (auto&& param : push_storage_policy_req.resource) { |
| io::RemoteFileSystemSPtr fs; |
| if (auto existed_resource = get_storage_resource(param.id); existed_resource) { |
| if (existed_resource->second >= param.version) { |
| // Stale request, ignore |
| continue; |
| } |
| |
| fs = std::move(existed_resource->first.fs); |
| } |
| |
| if (param.__isset.s3_storage_param) { |
| update_s3_resource(param, std::move(fs)); |
| } else if (param.__isset.hdfs_storage_param) { |
| update_hdfs_resource(param, std::move(fs)); |
| } else { |
| LOG(WARNING) << "unknown resource=" << param; |
| } |
| } |
| // drop storage policy |
| for (auto policy_id : push_storage_policy_req.dropped_storage_policy) { |
| delete_storage_policy(policy_id); |
| } |
| // refresh storage policy |
| for (auto&& storage_policy : push_storage_policy_req.storage_policy) { |
| auto existed_storage_policy = get_storage_policy(storage_policy.id); |
| if (existed_storage_policy == nullptr || |
| existed_storage_policy->version < storage_policy.version) { |
| auto storage_policy1 = std::make_shared<StoragePolicy>(); |
| storage_policy1->name = storage_policy.name; |
| storage_policy1->version = storage_policy.version; |
| storage_policy1->cooldown_datetime = storage_policy.cooldown_datetime; |
| storage_policy1->cooldown_ttl = storage_policy.cooldown_ttl; |
| storage_policy1->resource_id = storage_policy.resource_id; |
| LOG_INFO("successfully update storage policy") |
| .tag("storage_policy_id", storage_policy.id) |
| .tag("storage_policy", storage_policy1->to_string()); |
| put_storage_policy(storage_policy.id, std::move(storage_policy1)); |
| } |
| } |
| } |
| |
| void push_index_policy_callback(const TAgentTaskRequest& req) { |
| const auto& request = req.push_index_policy_req; |
| doris::ExecEnv::GetInstance()->index_policy_mgr()->apply_policy_changes( |
| request.index_policys, request.dropped_index_policys); |
| } |
| |
| void push_cooldown_conf_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& push_cooldown_conf_req = req.push_cooldown_conf; |
| for (const auto& cooldown_conf : push_cooldown_conf_req.cooldown_confs) { |
| int64_t tablet_id = cooldown_conf.tablet_id; |
| TabletSharedPtr tablet = engine.tablet_manager()->get_tablet(tablet_id); |
| if (tablet == nullptr) { |
| LOG(WARNING) << "failed to get tablet. tablet_id=" << tablet_id; |
| continue; |
| } |
| if (tablet->update_cooldown_conf(cooldown_conf.cooldown_term, |
| cooldown_conf.cooldown_replica_id) && |
| cooldown_conf.cooldown_replica_id == tablet->replica_id() && |
| tablet->tablet_meta()->cooldown_meta_id().initialized()) { |
| Tablet::async_write_cooldown_meta(tablet); |
| } |
| } |
| } |
| |
| void create_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& create_tablet_req = req.create_tablet_req; |
| RuntimeProfile runtime_profile("CreateTablet"); |
| RuntimeProfile* profile = &runtime_profile; |
| MonotonicStopWatch watch; |
| watch.start(); |
| Defer defer = [&] { |
| auto elapsed_time = static_cast<double>(watch.elapsed_time()); |
| if (elapsed_time / 1e9 > config::agent_task_trace_threshold_sec) { |
| COUNTER_UPDATE(profile->total_time_counter(), elapsed_time); |
| std::stringstream ss; |
| profile->pretty_print(&ss); |
| LOG(WARNING) << "create tablet cost(s) " << elapsed_time / 1e9 << std::endl << ss.str(); |
| } |
| }; |
| DorisMetrics::instance()->create_tablet_requests_total->increment(1); |
| VLOG_NOTICE << "start to create tablet " << create_tablet_req.tablet_id; |
| |
| std::vector<TTabletInfo> finish_tablet_infos; |
| VLOG_NOTICE << "create tablet: " << create_tablet_req; |
| Status status = engine.create_tablet(create_tablet_req, profile); |
| if (!status.ok()) { |
| DorisMetrics::instance()->create_tablet_requests_failed->increment(1); |
| LOG_WARNING("failed to create tablet, reason={}", status.to_string()) |
| .tag("signature", req.signature) |
| .tag("tablet_id", create_tablet_req.tablet_id) |
| .error(status); |
| } else { |
| increase_report_version(); |
| // get path hash of the created tablet |
| TabletSharedPtr tablet; |
| { |
| SCOPED_TIMER(ADD_TIMER(profile, "GetTablet")); |
| tablet = engine.tablet_manager()->get_tablet(create_tablet_req.tablet_id); |
| } |
| DCHECK(tablet != nullptr); |
| TTabletInfo tablet_info; |
| tablet_info.tablet_id = tablet->tablet_id(); |
| tablet_info.schema_hash = tablet->schema_hash(); |
| tablet_info.version = create_tablet_req.version; |
| // Useless but it is a required field in TTabletInfo |
| tablet_info.version_hash = 0; |
| tablet_info.row_count = 0; |
| tablet_info.data_size = 0; |
| tablet_info.__set_path_hash(tablet->data_dir()->path_hash()); |
| tablet_info.__set_replica_id(tablet->replica_id()); |
| finish_tablet_infos.push_back(tablet_info); |
| LOG_INFO("successfully create tablet") |
| .tag("signature", req.signature) |
| .tag("tablet_id", create_tablet_req.tablet_id); |
| } |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_finish_tablet_infos(finish_tablet_infos); |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_report_version(s_report_version); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void drop_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& drop_tablet_req = req.drop_tablet_req; |
| Status status; |
| auto dropped_tablet = engine.tablet_manager()->get_tablet(drop_tablet_req.tablet_id, false); |
| if (dropped_tablet != nullptr) { |
| status = engine.tablet_manager()->drop_tablet(drop_tablet_req.tablet_id, |
| drop_tablet_req.replica_id, |
| drop_tablet_req.is_drop_table_or_partition); |
| } else { |
| status = Status::NotFound("could not find tablet {}", drop_tablet_req.tablet_id); |
| } |
| if (status.ok()) { |
| // if tablet is dropped by fe, then the related txn should also be removed |
| engine.txn_manager()->force_rollback_tablet_related_txns( |
| dropped_tablet->data_dir()->get_meta(), drop_tablet_req.tablet_id, |
| dropped_tablet->tablet_uid()); |
| LOG_INFO("successfully drop tablet") |
| .tag("signature", req.signature) |
| .tag("tablet_id", drop_tablet_req.tablet_id) |
| .tag("replica_id", drop_tablet_req.replica_id); |
| } else { |
| LOG_WARNING("failed to drop tablet") |
| .tag("signature", req.signature) |
| .tag("tablet_id", drop_tablet_req.tablet_id) |
| .tag("replica_id", drop_tablet_req.replica_id) |
| .error(status); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| |
| TTabletInfo tablet_info; |
| tablet_info.tablet_id = drop_tablet_req.tablet_id; |
| tablet_info.schema_hash = drop_tablet_req.schema_hash; |
| tablet_info.version = 0; |
| // Useless but it is a required field in TTabletInfo |
| tablet_info.version_hash = 0; |
| tablet_info.row_count = 0; |
| tablet_info.data_size = 0; |
| |
| finish_task_request.__set_finish_tablet_infos({tablet_info}); |
| LOG_INFO("successfully drop tablet") |
| .tag("signature", req.signature) |
| .tag("tablet_id", drop_tablet_req.tablet_id); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void drop_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& drop_tablet_req = req.drop_tablet_req; |
| // here drop_tablet_req.tablet_id is the signature of the task, see DropReplicaTask in fe |
| Defer defer = [&] { remove_task_info(req.task_type, req.signature); }; |
| DBUG_EXECUTE_IF("WorkPoolCloudDropTablet.drop_tablet_callback.failed", { |
| LOG_WARNING("WorkPoolCloudDropTablet.drop_tablet_callback.failed") |
| .tag("tablet_id", drop_tablet_req.tablet_id); |
| return; |
| }); |
| MonotonicStopWatch watch; |
| watch.start(); |
| auto weak_tablets = engine.tablet_mgr().get_weak_tablets(); |
| std::ostringstream rowset_ids_stream; |
| bool found = false; |
| for (auto& weak_tablet : weak_tablets) { |
| auto tablet = weak_tablet.lock(); |
| if (tablet == nullptr) { |
| continue; |
| } |
| if (tablet->tablet_id() != drop_tablet_req.tablet_id) { |
| continue; |
| } |
| found = true; |
| auto clean_rowsets = tablet->get_snapshot_rowset(true); |
| // Get first 10 rowset IDs as comma-separated string, just for log |
| int count = 0; |
| for (const auto& rowset : clean_rowsets) { |
| if (count >= 10) break; |
| if (count > 0) { |
| rowset_ids_stream << ","; |
| } |
| rowset_ids_stream << rowset->rowset_id().to_string(); |
| count++; |
| } |
| |
| CloudTablet::recycle_cached_data(clean_rowsets); |
| break; |
| } |
| |
| if (!found) { |
| LOG(WARNING) << "tablet not found when dropping tablet_id=" << drop_tablet_req.tablet_id |
| << ", cost " << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)"; |
| return; |
| } |
| |
| engine.tablet_mgr().erase_tablet(drop_tablet_req.tablet_id); |
| LOG(INFO) << "drop cloud tablet_id=" << drop_tablet_req.tablet_id |
| << " and clean file cache first 10 rowsets {" << rowset_ids_stream.str() << "}, cost " |
| << static_cast<double>(watch.elapsed_time()) / 1e9 << "(s)"; |
| } |
| |
| void push_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& push_req = req.push_req; |
| |
| LOG(INFO) << "get push task. signature=" << req.signature |
| << " push_type=" << push_req.push_type; |
| std::vector<TTabletInfo> tablet_infos; |
| |
| // exist a path task_worker_pool <- agent_server <- backend_service <- BackendService |
| // use the arg BackendService_submit_tasks_args.tasks is not const |
| // and push_req will be modify, so modify is ok |
| EngineBatchLoadTask engine_task(engine, const_cast<TPushReq&>(push_req), &tablet_infos); |
| SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
| auto status = engine_task.execute(); |
| |
| // Return result to fe |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| if (push_req.push_type == TPushType::DELETE) { |
| finish_task_request.__set_request_version(push_req.version); |
| } |
| |
| if (status.ok()) { |
| LOG_INFO("successfully execute push task") |
| .tag("signature", req.signature) |
| .tag("tablet_id", push_req.tablet_id) |
| .tag("push_type", push_req.push_type); |
| increase_report_version(); |
| finish_task_request.__set_finish_tablet_infos(tablet_infos); |
| } else { |
| LOG_WARNING("failed to execute push task") |
| .tag("signature", req.signature) |
| .tag("tablet_id", push_req.tablet_id) |
| .tag("push_type", push_req.push_type) |
| .error(status); |
| } |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task_request.__set_report_version(s_report_version); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void cloud_push_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& push_req = req.push_req; |
| |
| LOG(INFO) << "get push task. signature=" << req.signature |
| << " push_type=" << push_req.push_type; |
| |
| // Return result to fe |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| |
| // Only support DELETE in cloud mode now |
| if (push_req.push_type != TPushType::DELETE) { |
| finish_task_request.__set_task_status( |
| Status::NotSupported("push_type {} not is supported", |
| std::to_string(push_req.push_type)) |
| .to_thrift()); |
| return; |
| } |
| |
| finish_task_request.__set_request_version(push_req.version); |
| |
| DorisMetrics::instance()->delete_requests_total->increment(1); |
| auto st = CloudDeleteTask::execute(engine, req.push_req); |
| if (st.ok()) { |
| LOG_INFO("successfully execute push task") |
| .tag("signature", req.signature) |
| .tag("tablet_id", push_req.tablet_id) |
| .tag("push_type", push_req.push_type); |
| increase_report_version(); |
| auto& tablet_info = finish_task_request.finish_tablet_infos.emplace_back(); |
| // Just need tablet_id |
| tablet_info.tablet_id = push_req.tablet_id; |
| finish_task_request.__isset.finish_tablet_infos = true; |
| } else { |
| DorisMetrics::instance()->delete_requests_failed->increment(1); |
| LOG_WARNING("failed to execute push task") |
| .tag("signature", req.signature) |
| .tag("tablet_id", push_req.tablet_id) |
| .tag("push_type", push_req.push_type) |
| .error(st); |
| } |
| |
| finish_task_request.__set_task_status(st.to_thrift()); |
| finish_task_request.__set_report_version(s_report_version); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| PublishVersionWorkerPool::PublishVersionWorkerPool(StorageEngine& engine) |
| : TaskWorkerPool("PUBLISH_VERSION", config::publish_version_worker_count, |
| [this](const TAgentTaskRequest& task) { publish_version_callback(task); }), |
| _engine(engine) {} |
| |
| PublishVersionWorkerPool::~PublishVersionWorkerPool() = default; |
| |
| void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest& req) { |
| const auto& publish_version_req = req.publish_version_req; |
| DorisMetrics::instance()->publish_task_request_total->increment(1); |
| VLOG_NOTICE << "get publish version task. signature=" << req.signature; |
| |
| std::set<TTabletId> error_tablet_ids; |
| std::map<TTabletId, TVersion> succ_tablets; |
| // partition_id, tablet_id, publish_version |
| std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets; |
| std::map<TTableId, std::map<TTabletId, int64_t>> table_id_to_tablet_id_to_num_delta_rows; |
| uint32_t retry_time = 0; |
| Status status; |
| constexpr uint32_t PUBLISH_VERSION_MAX_RETRY = 3; |
| while (retry_time < PUBLISH_VERSION_MAX_RETRY) { |
| succ_tablets.clear(); |
| error_tablet_ids.clear(); |
| table_id_to_tablet_id_to_num_delta_rows.clear(); |
| EnginePublishVersionTask engine_task(_engine, publish_version_req, &error_tablet_ids, |
| &succ_tablets, &discontinuous_version_tablets, |
| &table_id_to_tablet_id_to_num_delta_rows); |
| SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
| status = engine_task.execute(); |
| if (status.ok()) { |
| break; |
| } |
| |
| if (status.is<PUBLISH_VERSION_NOT_CONTINUOUS>()) { |
| // there are too many missing versions, it has been be added to async |
| // publish task, so no need to retry here. |
| if (discontinuous_version_tablets.empty()) { |
| break; |
| } |
| LOG_EVERY_SECOND(INFO) << "wait for previous publish version task to be done, " |
| << "transaction_id: " << publish_version_req.transaction_id; |
| |
| int64_t time_elapsed = time(nullptr) - req.recv_time; |
| if (time_elapsed > config::publish_version_task_timeout_s) { |
| LOG(INFO) << "task elapsed " << time_elapsed |
| << " seconds since it is inserted to queue, it is timeout"; |
| break; |
| } |
| |
| // Version not continuous, put to queue and wait pre version publish task execute |
| PUBLISH_VERSION_count << 1; |
| auto st = _thread_pool->submit_func([this, req] { |
| this->publish_version_callback(req); |
| PUBLISH_VERSION_count << -1; |
| }); |
| if (!st.ok()) [[unlikely]] { |
| PUBLISH_VERSION_count << -1; |
| status = std::move(st); |
| } else { |
| return; |
| } |
| } |
| |
| LOG_WARNING("failed to publish version") |
| .tag("transaction_id", publish_version_req.transaction_id) |
| .tag("error_tablets_num", error_tablet_ids.size()) |
| .tag("retry_time", retry_time) |
| .error(status); |
| ++retry_time; |
| } |
| |
| for (auto& item : discontinuous_version_tablets) { |
| _engine.add_async_publish_task(std::get<0>(item), std::get<1>(item), std::get<2>(item), |
| publish_version_req.transaction_id, false); |
| } |
| TFinishTaskRequest finish_task_request; |
| if (!status.ok()) [[unlikely]] { |
| DorisMetrics::instance()->publish_task_failed_total->increment(1); |
| // if publish failed, return failed, FE will ignore this error and |
| // check error tablet ids and FE will also republish this task |
| LOG_WARNING("failed to publish version") |
| .tag("signature", req.signature) |
| .tag("transaction_id", publish_version_req.transaction_id) |
| .tag("error_tablets_num", error_tablet_ids.size()) |
| .error(status); |
| } else { |
| if (!config::disable_auto_compaction && |
| (!config::enable_compaction_pause_on_high_memory || |
| !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) { |
| for (auto [tablet_id, _] : succ_tablets) { |
| TabletSharedPtr tablet = _engine.tablet_manager()->get_tablet(tablet_id); |
| if (tablet != nullptr) { |
| if (!tablet->tablet_meta()->tablet_schema()->disable_auto_compaction()) { |
| tablet->published_count.fetch_add(1); |
| int64_t published_count = tablet->published_count.load(); |
| int32_t max_version_config = tablet->max_version_config(); |
| if (tablet->exceed_version_limit( |
| max_version_config * |
| config::load_trigger_compaction_version_percent / 100) && |
| published_count % 20 == 0) { |
| auto st = _engine.submit_compaction_task( |
| tablet, CompactionType::CUMULATIVE_COMPACTION, true, false); |
| if (!st.ok()) [[unlikely]] { |
| LOG(WARNING) << "trigger compaction failed, tablet_id=" << tablet_id |
| << ", published=" << published_count << " : " << st; |
| } else { |
| LOG(INFO) << "trigger compaction succ, tablet_id:" << tablet_id |
| << ", published:" << published_count; |
| } |
| } |
| } |
| } else { |
| LOG(WARNING) << "trigger compaction failed, tablet_id:" << tablet_id; |
| } |
| } |
| } |
| int64_t cost_second = time(nullptr) - req.recv_time; |
| g_publish_version_latency << cost_second; |
| LOG_INFO("successfully publish version") |
| .tag("signature", req.signature) |
| .tag("transaction_id", publish_version_req.transaction_id) |
| .tag("tablets_num", succ_tablets.size()) |
| .tag("cost(s)", cost_second); |
| } |
| |
| status.to_thrift(&finish_task_request.task_status); |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_report_version(s_report_version); |
| finish_task_request.__set_succ_tablets(succ_tablets); |
| finish_task_request.__set_error_tablet_ids( |
| std::vector<TTabletId>(error_tablet_ids.begin(), error_tablet_ids.end())); |
| finish_task_request.__set_table_id_to_tablet_id_to_delta_num_rows( |
| table_id_to_tablet_id_to_num_delta_rows); |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void clear_transaction_task_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& clear_transaction_task_req = req.clear_transaction_task_req; |
| LOG(INFO) << "get clear transaction task. signature=" << req.signature |
| << ", transaction_id=" << clear_transaction_task_req.transaction_id |
| << ", partition_id_size=" << clear_transaction_task_req.partition_id.size(); |
| |
| Status status; |
| |
| if (clear_transaction_task_req.transaction_id > 0) { |
| // transaction_id should be greater than zero. |
| // If it is not greater than zero, no need to execute |
| // the following clear_transaction_task() function. |
| if (!clear_transaction_task_req.partition_id.empty()) { |
| engine.clear_transaction_task(clear_transaction_task_req.transaction_id, |
| clear_transaction_task_req.partition_id); |
| } else { |
| engine.clear_transaction_task(clear_transaction_task_req.transaction_id); |
| } |
| LOG(INFO) << "finish to clear transaction task. signature=" << req.signature |
| << ", transaction_id=" << clear_transaction_task_req.transaction_id; |
| } else { |
| LOG(WARNING) << "invalid transaction id " << clear_transaction_task_req.transaction_id |
| << ". signature= " << req.signature; |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_task_status(status.to_thrift()); |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void alter_tablet_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| int64_t signature = req.signature; |
| LOG(INFO) << "get alter table task, signature: " << signature; |
| bool is_task_timeout = false; |
| if (req.__isset.recv_time) { |
| int64_t time_elapsed = time(nullptr) - req.recv_time; |
| if (time_elapsed > config::report_task_interval_seconds * 20) { |
| LOG(INFO) << "task elapsed " << time_elapsed |
| << " seconds since it is inserted to queue, it is timeout"; |
| is_task_timeout = true; |
| } |
| } |
| if (!is_task_timeout) { |
| TFinishTaskRequest finish_task_request; |
| TTaskType::type task_type = req.task_type; |
| alter_tablet(engine, req, signature, task_type, &finish_task_request); |
| finish_task(finish_task_request); |
| } |
| doris::g_fragment_executing_count << -1; |
| int64_t now = duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count(); |
| g_fragment_last_active_time.set_value(now); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void alter_cloud_tablet_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
| int64_t signature = req.signature; |
| LOG(INFO) << "get alter table task, signature: " << signature; |
| bool is_task_timeout = false; |
| if (req.__isset.recv_time) { |
| int64_t time_elapsed = time(nullptr) - req.recv_time; |
| if (time_elapsed > config::report_task_interval_seconds * 20) { |
| LOG(INFO) << "task elapsed " << time_elapsed |
| << " seconds since it is inserted to queue, it is timeout"; |
| is_task_timeout = true; |
| } |
| } |
| if (!is_task_timeout) { |
| TFinishTaskRequest finish_task_request; |
| TTaskType::type task_type = req.task_type; |
| alter_cloud_tablet(engine, req, signature, task_type, &finish_task_request); |
| finish_task(finish_task_request); |
| } |
| doris::g_fragment_executing_count << -1; |
| int64_t now = duration_cast<std::chrono::milliseconds>( |
| std::chrono::system_clock::now().time_since_epoch()) |
| .count(); |
| g_fragment_last_active_time.set_value(now); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| std::unordered_map<int64_t, int64_t> gc_tablet_infos; |
| if (!req.__isset.gc_binlog_req) { |
| LOG(WARNING) << "gc binlog task is not valid"; |
| return; |
| } |
| if (!req.gc_binlog_req.__isset.tablet_gc_binlog_infos) { |
| LOG(WARNING) << "gc binlog task tablet_gc_binlog_infos is not valid"; |
| return; |
| } |
| |
| const auto& tablet_gc_binlog_infos = req.gc_binlog_req.tablet_gc_binlog_infos; |
| for (auto&& tablet_info : tablet_gc_binlog_infos) { |
| // gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.schema_hash); |
| gc_tablet_infos.emplace(tablet_info.tablet_id, tablet_info.version); |
| } |
| |
| engine.gc_binlogs(gc_tablet_infos); |
| } |
| |
| void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const TVisibleVersionReq& visible_version_req = req.visible_version_req; |
| engine.tablet_manager()->update_partitions_visible_version( |
| visible_version_req.partition_version); |
| } |
| |
| void clone_callback(StorageEngine& engine, const ClusterInfo* cluster_info, |
| const TAgentTaskRequest& req) { |
| const auto& clone_req = req.clone_req; |
| |
| DorisMetrics::instance()->clone_requests_total->increment(1); |
| LOG(INFO) << "get clone task. signature=" << req.signature; |
| |
| std::vector<TTabletInfo> tablet_infos; |
| EngineCloneTask engine_task(engine, clone_req, cluster_info, req.signature, &tablet_infos); |
| SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
| auto status = engine_task.execute(); |
| // Return result to fe |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| |
| if (!status.ok()) { |
| DorisMetrics::instance()->clone_requests_failed->increment(1); |
| LOG_WARNING("failed to clone tablet") |
| .tag("signature", req.signature) |
| .tag("tablet_id", clone_req.tablet_id) |
| .error(status); |
| } else { |
| LOG_INFO("successfully clone tablet") |
| .tag("signature", req.signature) |
| .tag("tablet_id", clone_req.tablet_id) |
| .tag("copy_size", engine_task.get_copy_size()) |
| .tag("copy_time_ms", engine_task.get_copy_time_ms()); |
| |
| if (engine_task.is_new_tablet()) { |
| increase_report_version(); |
| finish_task_request.__set_report_version(s_report_version); |
| } |
| finish_task_request.__set_finish_tablet_infos(tablet_infos); |
| finish_task_request.__set_copy_size(engine_task.get_copy_size()); |
| finish_task_request.__set_copy_time_ms(engine_task.get_copy_time_ms()); |
| } |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void storage_medium_migrate_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| const auto& storage_medium_migrate_req = req.storage_medium_migrate_req; |
| |
| // check request and get info |
| TabletSharedPtr tablet; |
| DataDir* dest_store = nullptr; |
| |
| auto status = check_migrate_request(engine, storage_medium_migrate_req, tablet, &dest_store); |
| if (status.ok()) { |
| EngineStorageMigrationTask engine_task(engine, tablet, dest_store); |
| SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
| status = engine_task.execute(); |
| } |
| // fe should ignore this err |
| if (status.is<FILE_ALREADY_EXIST>()) { |
| status = Status::OK(); |
| } |
| if (!status.ok()) { |
| LOG_WARNING("failed to migrate storage medium") |
| .tag("signature", req.signature) |
| .tag("tablet_id", storage_medium_migrate_req.tablet_id) |
| .error(status); |
| } else { |
| LOG_INFO("successfully migrate storage medium") |
| .tag("signature", req.signature) |
| .tag("tablet_id", storage_medium_migrate_req.tablet_id); |
| } |
| |
| TFinishTaskRequest finish_task_request; |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_task_status(status.to_thrift()); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void calc_delete_bitmap_callback(CloudStorageEngine& engine, const TAgentTaskRequest& req) { |
| std::vector<TTabletId> error_tablet_ids; |
| std::vector<TTabletId> succ_tablet_ids; |
| Status status; |
| error_tablet_ids.clear(); |
| const auto& calc_delete_bitmap_req = req.calc_delete_bitmap_req; |
| CloudEngineCalcDeleteBitmapTask engine_task(engine, calc_delete_bitmap_req, &error_tablet_ids, |
| &succ_tablet_ids); |
| SCOPED_ATTACH_TASK(engine_task.mem_tracker()); |
| if (req.signature != calc_delete_bitmap_req.transaction_id) { |
| // transaction_id may not be the same as req.signature, so add a log here |
| LOG_INFO("begin to execute calc delete bitmap task") |
| .tag("signature", req.signature) |
| .tag("transaction_id", calc_delete_bitmap_req.transaction_id); |
| } |
| status = engine_task.execute(); |
| |
| TFinishTaskRequest finish_task_request; |
| if (!status) { |
| DorisMetrics::instance()->publish_task_failed_total->increment(1); |
| LOG_WARNING("failed to calculate delete bitmap") |
| .tag("signature", req.signature) |
| .tag("transaction_id", calc_delete_bitmap_req.transaction_id) |
| .tag("error_tablets_num", error_tablet_ids.size()) |
| .error(status); |
| } |
| |
| status.to_thrift(&finish_task_request.task_status); |
| finish_task_request.__set_backend(BackendOptions::get_local_backend()); |
| finish_task_request.__set_task_type(req.task_type); |
| finish_task_request.__set_signature(req.signature); |
| finish_task_request.__set_report_version(s_report_version); |
| finish_task_request.__set_error_tablet_ids(error_tablet_ids); |
| finish_task_request.__set_resp_partitions(calc_delete_bitmap_req.partitions); |
| |
| finish_task(finish_task_request); |
| remove_task_info(req.task_type, req.signature); |
| } |
| |
| void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) { |
| LOG(INFO) << "clean trash start"; |
| DBUG_EXECUTE_IF("clean_trash_callback_sleep", { sleep(100); }) |
| static_cast<void>(engine.start_trash_sweep(nullptr, true)); |
| static_cast<void>(engine.notify_listener("REPORT_DISK_STATE")); |
| LOG(INFO) << "clean trash finish"; |
| } |
| |
| void clean_udf_cache_callback(const TAgentTaskRequest& req) { |
| if (doris::config::enable_java_support) { |
| LOG(INFO) << "clean udf cache start: " << req.clean_udf_cache_req.function_signature; |
| static_cast<void>( |
| JniUtil::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature)); |
| LOG(INFO) << "clean udf cache finish: " << req.clean_udf_cache_req.function_signature; |
| } |
| } |
| |
| void report_index_policy_callback(const ClusterInfo* cluster_info) { |
| TReportRequest request; |
| auto& index_policy_list = request.index_policy; |
| const auto& policys = doris::ExecEnv::GetInstance()->index_policy_mgr()->get_index_policys(); |
| for (const auto& policy : policys) { |
| index_policy_list.emplace_back(policy.second); |
| } |
| request.__isset.index_policy = true; |
| request.__set_backend(BackendOptions::get_local_backend()); |
| bool succ = handle_report(request, cluster_info, "index_policy"); |
| report_index_policy_total << 1; |
| if (!succ) [[unlikely]] { |
| report_index_policy_failed << 1; |
| } |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |