| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "meta/meta_bulk_load_service.h" |
| |
| #include <boost/cstdint.hpp> |
| #include <boost/lexical_cast.hpp> |
| // IWYU pragma: no_include <ext/alloc_traits.h> |
| #include <fmt/core.h> |
| #include <algorithm> |
| #include <chrono> |
| #include <cstdint> |
| |
| #include "block_service/block_service.h" |
| #include "block_service/block_service_manager.h" |
| #include "common/replica_envs.h" |
| #include "common/replication.codes.h" |
| #include "common/replication_enums.h" |
| #include "dsn.layer2_types.h" |
| #include "meta/meta_bulk_load_ingestion_context.h" |
| #include "meta/meta_data.h" |
| #include "meta/meta_service.h" |
| #include "meta/meta_state_service.h" |
| #include "meta/server_state.h" |
| #include "meta_admin_types.h" |
| #include "runtime/rpc/rpc_holder.h" |
| #include "runtime/rpc/rpc_message.h" |
| #include "runtime/rpc/serialization.h" |
| #include "runtime/task/async_calls.h" |
| #include "runtime/task/task.h" |
| #include "runtime/task/task_code.h" |
| #include "utils/autoref_ptr.h" |
| #include "utils/blob.h" |
| #include "utils/chrono_literals.h" |
| #include "utils/fail_point.h" |
| #include "utils/fmt_logging.h" |
| #include "utils/string_conv.h" |
| #include "absl/strings/string_view.h" |
| |
| DSN_DEFINE_uint32(meta_server, |
| bulk_load_max_rollback_times, |
| 10, |
| "if bulk load rollback time " |
| "exceed this value, meta won't " |
| "rollback bulk load process to " |
| "downloading, but turn it into " |
| "failed"); |
| DSN_TAG_VARIABLE(bulk_load_max_rollback_times, FT_MUTABLE); |
| |
| DSN_DEFINE_bool(meta_server, |
| bulk_load_verify_before_ingest, |
| false, |
| "verify files according to metadata before ingest"); |
| DSN_TAG_VARIABLE(bulk_load_verify_before_ingest, FT_MUTABLE); |
| |
| DSN_DEFINE_bool(meta_server, |
| enable_concurrent_bulk_load, |
| false, |
| "whether to enable different apps to execute bulk load at the same time"); |
| DSN_TAG_VARIABLE(enable_concurrent_bulk_load, FT_MUTABLE); |
| |
| namespace dsn { |
| namespace replication { |
| |
| bulk_load_service::bulk_load_service(meta_service *meta_svc, const std::string &bulk_load_dir) |
| : _meta_svc(meta_svc), _state(meta_svc->get_server_state()), _bulk_load_root(bulk_load_dir) |
| { |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void bulk_load_service::initialize_bulk_load_service() |
| { |
| _sync_bulk_load_storage = |
| std::make_unique<mss::meta_storage>(_meta_svc->get_remote_storage(), &_sync_tracker); |
| _ingestion_context = std::make_unique<ingestion_context>(); |
| |
| create_bulk_load_root_dir(); |
| _sync_tracker.wait_outstanding_tasks(); |
| |
| try_to_continue_bulk_load(); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void bulk_load_service::on_start_bulk_load(start_bulk_load_rpc rpc) |
| { |
| FAIL_POINT_INJECT_F("meta_on_start_bulk_load", |
| [=](absl::string_view) { rpc.response().err = ERR_OK; }); |
| |
| const auto &request = rpc.request(); |
| auto &response = rpc.response(); |
| response.err = ERR_OK; |
| |
| if (!FLAGS_enable_concurrent_bulk_load && |
| !_meta_svc->try_lock_meta_op_status(meta_op_status::BULKLOAD)) { |
| response.hint_msg = "meta server is busy now, please wait"; |
| LOG_ERROR("{}", response.hint_msg); |
| response.err = ERR_BUSY; |
| return; |
| } |
| |
| std::shared_ptr<app_state> app = get_app(request.app_name); |
| if (app == nullptr || app->status != app_status::AS_AVAILABLE) { |
| response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; |
| response.hint_msg = fmt::format( |
| "app {} is ", response.err == ERR_APP_NOT_EXIST ? "not existed" : "not available"); |
| LOG_ERROR("{}", response.hint_msg); |
| _meta_svc->unlock_meta_op_status(); |
| return; |
| } |
| if (app->is_bulk_loading) { |
| response.err = ERR_BUSY; |
| response.hint_msg = fmt::format("app({}) is already executing bulk load", app->app_name); |
| LOG_ERROR("{}", response.hint_msg); |
| _meta_svc->unlock_meta_op_status(); |
| return; |
| } |
| |
| std::string hint_msg; |
| error_code e = check_bulk_load_request_params( |
| request, app->app_id, app->partition_count, app->envs, hint_msg); |
| if (e != ERR_OK) { |
| response.err = e; |
| response.hint_msg = hint_msg; |
| _meta_svc->unlock_meta_op_status(); |
| return; |
| } |
| |
| LOG_INFO("app({}) start bulk load, cluster_name = {}, provider = {}, remote root path = {}, " |
| "ingest_behind = {}", |
| request.app_name, |
| request.cluster_name, |
| request.file_provider_type, |
| request.remote_root_path, |
| request.ingest_behind); |
| |
| // clear old bulk load result |
| reset_local_bulk_load_states(app->app_id, app->app_name, true); |
| // avoid possible load balancing |
| _meta_svc->set_function_level(meta_function_level::fl_steady); |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| [this, rpc, app]() { do_start_app_bulk_load(std::move(app), std::move(rpc)); }, |
| server_state::sStateHash); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| error_code |
| bulk_load_service::check_bulk_load_request_params(const start_bulk_load_request &request, |
| const int32_t app_id, |
| const int32_t partition_count, |
| const std::map<std::string, std::string> &envs, |
| std::string &hint_msg) |
| { |
| FAIL_POINT_INJECT_F("meta_check_bulk_load_request_params", |
| [](absl::string_view) -> error_code { return ERR_OK; }); |
| |
| if (!validate_ingest_behind(envs, request.ingest_behind)) { |
| hint_msg = fmt::format("inconsistent ingestion behind option"); |
| LOG_ERROR("{}", hint_msg); |
| return ERR_INCONSISTENT_STATE; |
| } |
| |
| auto file_provider = request.file_provider_type; |
| // check file provider |
| dsn::dist::block_service::block_filesystem *blk_fs = |
| _meta_svc->get_block_service_manager().get_or_create_block_filesystem(file_provider); |
| if (blk_fs == nullptr) { |
| LOG_ERROR("invalid remote file provider type: {}", file_provider); |
| hint_msg = "invalid file_provider"; |
| return ERR_INVALID_PARAMETERS; |
| } |
| |
| // sync get bulk_load_info file_handler |
| const std::string remote_path = |
| get_bulk_load_info_path(request.app_name, request.cluster_name, request.remote_root_path); |
| dsn::dist::block_service::create_file_request cf_req; |
| cf_req.file_name = remote_path; |
| cf_req.ignore_metadata = true; |
| error_code err = ERR_OK; |
| dsn::dist::block_service::block_file_ptr file_handler = nullptr; |
| blk_fs |
| ->create_file( |
| cf_req, |
| TASK_CODE_EXEC_INLINED, |
| [&err, &file_handler](const dsn::dist::block_service::create_file_response &resp) { |
| err = resp.err; |
| file_handler = resp.file_handle; |
| }) |
| ->wait(); |
| if (err != ERR_OK || file_handler == nullptr) { |
| LOG_ERROR( |
| "failed to get file({}) handler on remote provider({})", remote_path, file_provider); |
| hint_msg = "file_provider error"; |
| return ERR_FILE_OPERATION_FAILED; |
| } |
| |
| // sync read bulk_load_info on file provider |
| dsn::dist::block_service::read_response r_resp; |
| file_handler |
| ->read(dsn::dist::block_service::read_request{0, -1}, |
| TASK_CODE_EXEC_INLINED, |
| [&r_resp](const dsn::dist::block_service::read_response &resp) { r_resp = resp; }) |
| ->wait(); |
| if (r_resp.err != ERR_OK) { |
| LOG_ERROR("failed to read file({}) on remote provider({}), error = {}", |
| remote_path, |
| file_provider, |
| r_resp.err); |
| hint_msg = "read bulk_load_info failed"; |
| return r_resp.err; |
| } |
| |
| bulk_load_info bl_info; |
| if (!::dsn::json::json_forwarder<bulk_load_info>::decode(r_resp.buffer, bl_info)) { |
| LOG_ERROR("file({}) is damaged on remote file provider({})", remote_path, file_provider); |
| hint_msg = "bulk_load_info damaged"; |
| return ERR_CORRUPTION; |
| } |
| |
| if (bl_info.app_id != app_id || bl_info.partition_count != partition_count) { |
| LOG_ERROR("app({}) information is inconsistent, local app_id({}) VS remote app_id({}), " |
| "local partition_count({}) VS remote partition_count({})", |
| request.app_name, |
| app_id, |
| bl_info.app_id, |
| partition_count, |
| bl_info.partition_count); |
| hint_msg = "app_id or partition_count is inconsistent"; |
| return ERR_INCONSISTENT_STATE; |
| } |
| |
| return ERR_OK; |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::do_start_app_bulk_load(std::shared_ptr<app_state> app, |
| start_bulk_load_rpc rpc) |
| { |
| app_info info = *app; |
| info.__set_is_bulk_loading(true); |
| |
| blob value = dsn::json::json_forwarder<app_info>::encode(info); |
| _meta_svc->get_meta_storage()->set_data( |
| _state->get_app_path(*app), std::move(value), [app, rpc, this]() { |
| { |
| zauto_write_lock l(app_lock()); |
| app->is_bulk_loading = true; |
| } |
| { |
| zauto_write_lock l(_lock); |
| _bulk_load_app_id.insert(app->app_id); |
| _apps_in_progress_count[app->app_id] = app->partition_count; |
| } |
| create_app_bulk_load_dir( |
| app->app_name, app->app_id, app->partition_count, std::move(rpc)); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::create_app_bulk_load_dir(const std::string &app_name, |
| int32_t app_id, |
| int32_t partition_count, |
| start_bulk_load_rpc rpc) |
| { |
| const auto &req = rpc.request(); |
| |
| app_bulk_load_info ainfo; |
| ainfo.app_id = app_id; |
| ainfo.app_name = app_name; |
| ainfo.partition_count = partition_count; |
| ainfo.status = bulk_load_status::BLS_DOWNLOADING; |
| ainfo.cluster_name = req.cluster_name; |
| ainfo.file_provider_type = req.file_provider_type; |
| ainfo.remote_root_path = req.remote_root_path; |
| ainfo.ingest_behind = req.ingest_behind; |
| ainfo.is_ever_ingesting = false; |
| ainfo.bulk_load_err = ERR_OK; |
| |
| _meta_svc->get_meta_storage()->delete_node_recursively( |
| get_app_bulk_load_path(app_id), [this, rpc, ainfo]() { |
| std::string bulk_load_path = get_app_bulk_load_path(ainfo.app_id); |
| LOG_INFO("remove app({}) bulk load dir {} succeed", ainfo.app_name, bulk_load_path); |
| |
| blob value = dsn::json::json_forwarder<app_bulk_load_info>::encode(ainfo); |
| _meta_svc->get_meta_storage()->create_node( |
| std::move(bulk_load_path), std::move(value), [this, rpc, ainfo]() { |
| LOG_DEBUG("create app({}) bulk load dir", ainfo.app_name); |
| { |
| zauto_write_lock l(_lock); |
| _app_bulk_load_info[ainfo.app_id] = ainfo; |
| _apps_pending_sync_flag[ainfo.app_id] = false; |
| _apps_rollback_count[ainfo.app_id] = 0; |
| } |
| for (int32_t i = 0; i < ainfo.partition_count; ++i) { |
| create_partition_bulk_load_dir(ainfo.app_name, |
| gpid(ainfo.app_id, i), |
| ainfo.partition_count, |
| std::move(rpc)); |
| } |
| }); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::create_partition_bulk_load_dir(const std::string &app_name, |
| const gpid &pid, |
| int32_t partition_count, |
| start_bulk_load_rpc rpc) |
| { |
| partition_bulk_load_info pinfo; |
| pinfo.status = bulk_load_status::BLS_DOWNLOADING; |
| pinfo.ever_ingest_succeed = false; |
| blob value = dsn::json::json_forwarder<partition_bulk_load_info>::encode(pinfo); |
| |
| _meta_svc->get_meta_storage()->create_node( |
| get_partition_bulk_load_path(pid), |
| std::move(value), |
| [app_name, pid, partition_count, rpc, pinfo, this]() { |
| LOG_DEBUG("app({}) create partition({}) bulk_load_info", app_name, pid); |
| { |
| zauto_write_lock l(_lock); |
| _partition_bulk_load_info[pid] = pinfo; |
| _partitions_pending_sync_flag[pid] = false; |
| if (--_apps_in_progress_count[pid.get_app_id()] == 0) { |
| LOG_INFO("app({}) start bulk load succeed", app_name); |
| _apps_in_progress_count[pid.get_app_id()] = partition_count; |
| rpc.response().err = ERR_OK; |
| } |
| } |
| // start send bulk load to replica servers |
| partition_bulk_load(app_name, pid); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| bool bulk_load_service::check_partition_status( |
| const std::string &app_name, |
| const gpid &pid, |
| bool always_unhealthy_check, |
| const std::function<void(const std::string &, const gpid &)> &retry_function, |
| /*out*/ partition_configuration &pconfig) |
| { |
| std::shared_ptr<app_state> app = get_app(pid.get_app_id()); |
| if (app == nullptr || app->status != app_status::AS_AVAILABLE) { |
| LOG_WARNING( |
| "app(name={}, id={}, status={}) is not existed or not available, set bulk load failed", |
| app_name, |
| pid.get_app_id(), |
| app ? dsn::enum_to_string(app->status) : "-"); |
| handle_app_unavailable(pid.get_app_id(), app_name); |
| return false; |
| } |
| |
| pconfig = app->partitions[pid.get_partition_index()]; |
| if (pconfig.primary.is_invalid()) { |
| LOG_WARNING("app({}) partition({}) primary is invalid, try it later", app_name, pid); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| [retry_function, app_name, pid]() { retry_function(app_name, pid); }, |
| 0, |
| std::chrono::seconds(1)); |
| return false; |
| } |
| |
| if (pconfig.secondaries.size() < pconfig.max_replica_count - 1) { |
| bulk_load_status::type p_status; |
| { |
| zauto_read_lock l(_lock); |
| p_status = get_partition_bulk_load_status_unlocked(pid); |
| } |
| // rollback to downloading, pause,cancel,failed bulk load should always send to replica |
| // server |
| if (!always_unhealthy_check && (p_status == bulk_load_status::BLS_DOWNLOADING || |
| p_status == bulk_load_status::BLS_PAUSING || |
| p_status == bulk_load_status::BLS_CANCELED || |
| p_status == bulk_load_status::BLS_FAILED)) { |
| return true; |
| } |
| LOG_WARNING("app({}) partition({}) is unhealthy, status({}), try it later", |
| app_name, |
| pid, |
| dsn::enum_to_string(p_status)); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| [retry_function, app_name, pid]() { retry_function(app_name, pid); }, |
| 0, |
| std::chrono::seconds(1)); |
| return false; |
| } |
| return true; |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::partition_bulk_load(const std::string &app_name, const gpid &pid) |
| { |
| FAIL_POINT_INJECT_F("meta_bulk_load_partition_bulk_load", [](absl::string_view) {}); |
| |
| partition_configuration pconfig; |
| if (!check_partition_status(app_name, |
| pid, |
| false, |
| std::bind(&bulk_load_service::partition_bulk_load, |
| this, |
| std::placeholders::_1, |
| std::placeholders::_2), |
| pconfig)) { |
| return; |
| } |
| |
| rpc_address primary_addr = pconfig.primary; |
| auto req = std::make_unique<bulk_load_request>(); |
| { |
| zauto_read_lock l(_lock); |
| const app_bulk_load_info &ainfo = _app_bulk_load_info[pid.get_app_id()]; |
| req->pid = pid; |
| req->app_name = app_name; |
| req->primary_addr = primary_addr; |
| req->remote_provider_name = ainfo.file_provider_type; |
| req->cluster_name = ainfo.cluster_name; |
| req->meta_bulk_load_status = get_partition_bulk_load_status_unlocked(pid); |
| req->ballot = pconfig.ballot; |
| req->query_bulk_load_metadata = is_partition_metadata_not_updated_unlocked(pid); |
| req->remote_root_path = ainfo.remote_root_path; |
| } |
| |
| LOG_INFO("send bulk load request to node({}), app({}), partition({}), partition " |
| "status = {}, remote provider = {}, cluster_name = {}, remote_root_path = {}", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(req->meta_bulk_load_status), |
| req->remote_provider_name, |
| req->cluster_name, |
| req->remote_root_path); |
| |
| bulk_load_rpc rpc(std::move(req), RPC_BULK_LOAD, 0_ms, 0, pid.thread_hash()); |
| rpc.call(primary_addr, _meta_svc->tracker(), [this, rpc](error_code err) mutable { |
| on_partition_bulk_load_reply(err, rpc.request(), rpc.response()); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::on_partition_bulk_load_reply(error_code err, |
| const bulk_load_request &request, |
| const bulk_load_response &response) |
| { |
| const std::string &app_name = request.app_name; |
| const gpid &pid = request.pid; |
| const rpc_address &primary_addr = request.primary_addr; |
| |
| if (err != ERR_OK) { |
| LOG_ERROR( |
| "app({}), partition({}) failed to receive bulk load response from node({}), error = {}", |
| app_name, |
| pid, |
| primary_addr, |
| err); |
| try_rollback_to_downloading(app_name, pid); |
| try_resend_bulk_load_request(app_name, pid); |
| return; |
| } |
| |
| if (response.err == ERR_OBJECT_NOT_FOUND || response.err == ERR_INVALID_STATE) { |
| LOG_ERROR( |
| "app({}), partition({}) doesn't exist or has invalid state on node({}), error = {}", |
| app_name, |
| pid, |
| primary_addr, |
| response.err); |
| try_rollback_to_downloading(app_name, pid); |
| try_resend_bulk_load_request(app_name, pid); |
| return; |
| } |
| |
| if (response.err == ERR_BUSY) { |
| LOG_WARNING( |
| "node({}) has enough replicas downloading, wait for next round to send bulk load " |
| "request for app({}), partition({})", |
| primary_addr, |
| app_name, |
| pid); |
| try_resend_bulk_load_request(app_name, pid); |
| return; |
| } |
| |
| if (response.err != ERR_OK) { |
| LOG_ERROR("app({}), partition({}) from node({}) handle bulk load response failed, error = " |
| "{}, primary status = {}", |
| app_name, |
| pid, |
| primary_addr, |
| response.err, |
| dsn::enum_to_string(response.primary_bulk_load_status)); |
| handle_bulk_load_failed(pid.get_app_id(), response.err); |
| try_resend_bulk_load_request(app_name, pid); |
| return; |
| } |
| |
| // response.err = ERR_OK |
| std::shared_ptr<app_state> app = get_app(pid.get_app_id()); |
| if (app == nullptr || app->status != app_status::AS_AVAILABLE) { |
| LOG_WARNING( |
| "app(name={}, id={}) is not existed, set bulk load failed", app_name, pid.get_app_id()); |
| handle_app_unavailable(pid.get_app_id(), app_name); |
| return; |
| } |
| ballot current_ballot = app->partitions[pid.get_partition_index()].ballot; |
| if (request.ballot < current_ballot) { |
| LOG_WARNING( |
| "receive out-date response from node({}), app({}), partition({}), request ballot = " |
| "{}, current ballot= {}", |
| primary_addr, |
| app_name, |
| pid, |
| request.ballot, |
| current_ballot); |
| try_rollback_to_downloading(app_name, pid); |
| try_resend_bulk_load_request(app_name, pid); |
| return; |
| } |
| |
| // handle bulk load states reported from primary replica |
| bulk_load_status::type app_status = get_app_bulk_load_status(response.pid.get_app_id()); |
| switch (app_status) { |
| case bulk_load_status::BLS_DOWNLOADING: |
| handle_app_downloading(response, primary_addr); |
| break; |
| case bulk_load_status::BLS_DOWNLOADED: |
| update_partition_info_on_remote_storage( |
| response.app_name, response.pid, bulk_load_status::BLS_INGESTING); |
| // when app status is downloaded or ingesting, send request frequently |
| break; |
| case bulk_load_status::BLS_INGESTING: |
| handle_app_ingestion(response, primary_addr); |
| break; |
| case bulk_load_status::BLS_SUCCEED: |
| case bulk_load_status::BLS_FAILED: |
| case bulk_load_status::BLS_CANCELED: |
| handle_bulk_load_finish(response, primary_addr); |
| break; |
| case bulk_load_status::BLS_PAUSING: |
| handle_app_pausing(response, primary_addr); |
| break; |
| case bulk_load_status::BLS_PAUSED: |
| // paused not send request to replica servers |
| return; |
| default: |
| // do nothing in other status |
| break; |
| } |
| |
| try_resend_bulk_load_request(app_name, pid); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::try_resend_bulk_load_request(const std::string &app_name, const gpid &pid) |
| { |
| FAIL_POINT_INJECT_F("meta_bulk_load_resend_request", [](absl::string_view) {}); |
| zauto_read_lock l(_lock); |
| if (is_app_bulk_loading_unlocked(pid.get_app_id())) { |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::partition_bulk_load, this, app_name, pid), |
| 0, |
| std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::handle_app_downloading(const bulk_load_response &response, |
| const rpc_address &primary_addr) |
| { |
| const std::string &app_name = response.app_name; |
| const gpid &pid = response.pid; |
| |
| if (!response.__isset.total_download_progress) { |
| LOG_WARNING( |
| "receive bulk load response from node({}) app({}), partition({}), primary_status({}), " |
| "but total_download_progress is not set", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status)); |
| return; |
| } |
| |
| for (const auto &kv : response.group_bulk_load_state) { |
| const auto &bulk_load_states = kv.second; |
| if (!bulk_load_states.__isset.download_progress || |
| !bulk_load_states.__isset.download_status) { |
| LOG_WARNING("receive bulk load response from node({}) app({}), partition({}), " |
| "primary_status({}), but node({}) progress or status is not set", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status), |
| kv.first); |
| return; |
| } |
| // check partition download status |
| if (bulk_load_states.download_status != ERR_OK) { |
| LOG_ERROR("app({}) partition({}) on node({}) meet unrecoverable error during " |
| "downloading files, error = {}", |
| app_name, |
| pid, |
| kv.first, |
| bulk_load_states.download_status); |
| |
| error_code err = ERR_UNKNOWN; |
| // ERR_FILE_OPERATION_FAILED: local file system error |
| // ERR_FS_INTERNAL: remote file system error |
| // ERR_CORRUPTION: file not exist or damaged |
| if (ERR_FILE_OPERATION_FAILED == bulk_load_states.download_status || |
| ERR_FS_INTERNAL == bulk_load_states.download_status || |
| ERR_CORRUPTION == bulk_load_states.download_status) { |
| err = bulk_load_states.download_status; |
| } |
| handle_bulk_load_failed(pid.get_app_id(), err); |
| return; |
| } |
| } |
| |
| // if replica report metadata, update metadata on remote storage |
| if (response.__isset.metadata && is_partition_metadata_not_updated(pid)) { |
| update_partition_metadata_on_remote_storage(app_name, pid, response.metadata); |
| } |
| |
| // update download progress |
| int32_t total_progress = response.total_download_progress; |
| LOG_INFO("receive bulk load response from node({}) app({}) partition({}), primary_status({}), " |
| "total_download_progress = {}", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status), |
| total_progress); |
| { |
| zauto_write_lock l(_lock); |
| _partitions_total_download_progress[pid] = total_progress; |
| _partitions_bulk_load_state[pid] = response.group_bulk_load_state; |
| } |
| |
| // update partition status to `downloaded` if all replica downloaded |
| if (total_progress >= bulk_load_constant::PROGRESS_FINISHED) { |
| LOG_INFO( |
| "app({}) partirion({}) download all files from remote provider succeed", app_name, pid); |
| update_partition_info_on_remote_storage(app_name, pid, bulk_load_status::BLS_DOWNLOADED); |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::handle_app_ingestion(const bulk_load_response &response, |
| const rpc_address &primary_addr) |
| { |
| const std::string &app_name = response.app_name; |
| const gpid &pid = response.pid; |
| |
| if (!response.__isset.is_group_ingestion_finished) { |
| LOG_WARNING("receive bulk load response from node({}) app({}) partition({}), " |
| "primary_status({}), but is_group_ingestion_finished is not set", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status)); |
| return; |
| } |
| |
| for (const auto &kv : response.group_bulk_load_state) { |
| const auto &bulk_load_states = kv.second; |
| if (!bulk_load_states.__isset.ingest_status) { |
| LOG_WARNING("receive bulk load response from node({}) app({}) partition({}), " |
| "primary_status({}), but node({}) ingestion_status is not set", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status), |
| kv.first); |
| return; |
| } |
| |
| if (bulk_load_states.ingest_status == ingestion_status::IS_FAILED) { |
| LOG_ERROR( |
| "app({}) partition({}) on node({}) ingestion failed", app_name, pid, kv.first); |
| finish_ingestion(pid); |
| handle_bulk_load_failed(pid.get_app_id(), ERR_INGESTION_FAILED); |
| return; |
| } |
| } |
| |
| LOG_INFO("receive bulk load response from node({}) app({}) partition({}), primary_status({}), " |
| "is_group_ingestion_finished = {}", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status), |
| response.is_group_ingestion_finished); |
| { |
| zauto_write_lock l(_lock); |
| _partitions_bulk_load_state[pid] = response.group_bulk_load_state; |
| } |
| |
| if (response.is_group_ingestion_finished) { |
| LOG_INFO("app({}) partition({}) ingestion files succeed", app_name, pid); |
| finish_ingestion(pid); |
| update_partition_info_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED); |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response, |
| const rpc_address &primary_addr) |
| { |
| const std::string &app_name = response.app_name; |
| const gpid &pid = response.pid; |
| |
| if (!response.__isset.is_group_bulk_load_context_cleaned_up) { |
| LOG_WARNING("receive bulk load response from node({}) app({}) partition({}), " |
| "primary_status({}), but is_group_bulk_load_context_cleaned_up is not set", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status)); |
| return; |
| } |
| |
| for (const auto &kv : response.group_bulk_load_state) { |
| if (!kv.second.__isset.is_cleaned_up) { |
| LOG_WARNING("receive bulk load response from node({}) app({}), partition({}), " |
| "primary_status({}), but node({}) is_cleaned_up is not set", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status), |
| kv.first); |
| return; |
| } |
| } |
| |
| { |
| zauto_read_lock l(_lock); |
| if (_partitions_cleaned_up[pid]) { |
| LOG_WARNING( |
| "receive bulk load response from node({}) app({}) partition({}), current partition " |
| "has already been cleaned up", |
| primary_addr, |
| app_name, |
| pid); |
| return; |
| } |
| } |
| |
| // The replicas have cleaned up their bulk load states and removed temporary sst files |
| bool group_cleaned_up = response.is_group_bulk_load_context_cleaned_up; |
| LOG_INFO("receive bulk load response from node({}) app({}) partition({}), primary status = {}, " |
| "is_group_bulk_load_context_cleaned_up = {}", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status), |
| group_cleaned_up); |
| { |
| zauto_write_lock l(_lock); |
| _partitions_cleaned_up[pid] = group_cleaned_up; |
| _partitions_bulk_load_state[pid] = response.group_bulk_load_state; |
| } |
| |
| if (group_cleaned_up) { |
| int32_t count = 0; |
| { |
| zauto_write_lock l(_lock); |
| count = --_apps_in_progress_count[pid.get_app_id()]; |
| } |
| if (count == 0) { |
| std::shared_ptr<app_state> app = get_app(pid.get_app_id()); |
| if (app == nullptr || app->status != app_status::AS_AVAILABLE) { |
| LOG_WARNING("app(name={}, id={}) is not existed, remove bulk load dir on remote " |
| "storage", |
| app_name, |
| pid.get_app_id()); |
| remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name); |
| return; |
| } |
| LOG_INFO("app({}) update app to not bulk loading", app_name); |
| update_app_not_bulk_loading_on_remote_storage(std::move(app)); |
| reset_local_bulk_load_states(pid.get_app_id(), app_name, false); |
| } |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::handle_app_pausing(const bulk_load_response &response, |
| const rpc_address &primary_addr) |
| { |
| const std::string &app_name = response.app_name; |
| const gpid &pid = response.pid; |
| |
| if (!response.__isset.is_group_bulk_load_paused) { |
| LOG_WARNING("receive bulk load response from node({}) app({}) partition({}), " |
| "primary_status({}), but is_group_bulk_load_paused is not set", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status)); |
| return; |
| } |
| |
| for (const auto &kv : response.group_bulk_load_state) { |
| if (!kv.second.__isset.is_paused) { |
| LOG_WARNING("receive bulk load response from node({}) app({}), partition({}), " |
| "primary_status({}), but node({}) is_paused is not set", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status), |
| kv.first); |
| return; |
| } |
| } |
| |
| bool is_group_paused = response.is_group_bulk_load_paused; |
| LOG_INFO("receive bulk load response from node({}) app({}) partition({}), primary status = {}, " |
| "is_group_bulk_load_paused = {}", |
| primary_addr, |
| app_name, |
| pid, |
| dsn::enum_to_string(response.primary_bulk_load_status), |
| is_group_paused); |
| { |
| zauto_write_lock l(_lock); |
| _partitions_bulk_load_state[pid] = response.group_bulk_load_state; |
| } |
| |
| if (is_group_paused) { |
| LOG_INFO("app({}) partirion({}) pause bulk load succeed", response.app_name, pid); |
| update_partition_info_on_remote_storage( |
| response.app_name, pid, bulk_load_status::BLS_PAUSED); |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::try_rollback_to_downloading(const std::string &app_name, const gpid &pid) |
| { |
| zauto_write_lock l(_lock); |
| |
| const auto app_status = get_app_bulk_load_status_unlocked(pid.get_app_id()); |
| if (app_status != bulk_load_status::BLS_DOWNLOADING && |
| app_status != bulk_load_status::BLS_DOWNLOADED && |
| app_status != bulk_load_status::BLS_INGESTING) { |
| LOG_INFO("app({}) status={}, no need to rollback to downloading, wait for next round", |
| app_name, |
| dsn::enum_to_string(app_status)); |
| return; |
| } |
| |
| if (_apps_rolling_back[pid.get_app_id()]) { |
| LOG_WARNING("app({}) is rolling back to downloading, ignore this request", app_name); |
| return; |
| } |
| if (_apps_rollback_count[pid.get_app_id()] >= FLAGS_bulk_load_max_rollback_times) { |
| LOG_WARNING( |
| "app({}) has been rollback to downloading for {} times, make bulk load process failed", |
| app_name, |
| _apps_rollback_count[pid.get_app_id()]); |
| |
| update_app_status_on_remote_storage_unlocked( |
| pid.get_app_id(), |
| bulk_load_status::BLS_FAILED, |
| _app_bulk_load_info[pid.get_app_id()].is_ever_ingesting ? ERR_INGESTION_FAILED |
| : ERR_RETRY_EXHAUSTED); |
| return; |
| } |
| LOG_INFO("app({}) will rolling back from {} to {}, current rollback_count = {}", |
| app_name, |
| dsn::enum_to_string(app_status), |
| dsn::enum_to_string(bulk_load_status::BLS_DOWNLOADING), |
| _apps_rollback_count[pid.get_app_id()]); |
| _apps_rolling_back[pid.get_app_id()] = true; |
| _apps_rollback_count[pid.get_app_id()]++; |
| update_app_status_on_remote_storage_unlocked(pid.get_app_id(), |
| bulk_load_status::type::BLS_DOWNLOADING); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::handle_bulk_load_failed(int32_t app_id, error_code err) |
| { |
| zauto_write_lock l(_lock); |
| if (!_apps_cleaning_up[app_id]) { |
| _apps_cleaning_up[app_id] = true; |
| update_app_status_on_remote_storage_unlocked(app_id, bulk_load_status::BLS_FAILED, err); |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string &app_name) |
| { |
| zauto_write_lock l(_lock); |
| if (is_app_bulk_loading_unlocked(app_id) && !_apps_cleaning_up[app_id]) { |
| _apps_cleaning_up[app_id] = true; |
| reset_local_bulk_load_states_unlocked(app_id, app_name, false); |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::update_partition_metadata_on_remote_storage( |
| const std::string &app_name, const gpid &pid, const bulk_load_metadata &metadata) |
| { |
| zauto_read_lock l(_lock); |
| partition_bulk_load_info pinfo = _partition_bulk_load_info[pid]; |
| pinfo.metadata = metadata; |
| blob value = json::json_forwarder<partition_bulk_load_info>::encode(pinfo); |
| |
| _meta_svc->get_meta_storage()->set_data( |
| get_partition_bulk_load_path(pid), std::move(value), [this, app_name, pid, pinfo]() { |
| zauto_write_lock l(_lock); |
| _partition_bulk_load_info[pid] = pinfo; |
| LOG_INFO( |
| "app({}) update partition({}) bulk load metadata, file count = {}, file size = {}", |
| app_name, |
| pid, |
| pinfo.metadata.files.size(), |
| pinfo.metadata.file_total_size); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::update_partition_info_on_remote_storage(const std::string &app_name, |
| const gpid &pid, |
| bulk_load_status::type new_status, |
| bool should_send_request) |
| { |
| zauto_write_lock l(_lock); |
| partition_bulk_load_info pinfo = _partition_bulk_load_info[pid]; |
| if (pinfo.status == new_status && new_status != bulk_load_status::BLS_DOWNLOADING) { |
| LOG_WARNING("app({}) partition({}) old status:{} VS new status:{}, ignore it", |
| app_name, |
| pid, |
| dsn::enum_to_string(pinfo.status), |
| dsn::enum_to_string(new_status)); |
| return; |
| } |
| |
| if (_partitions_pending_sync_flag[pid]) { |
| if (_apps_rolling_back[pid.get_app_id()] && |
| new_status == bulk_load_status::BLS_DOWNLOADING) { |
| LOG_WARNING( |
| "app({}) partition({}) has already sync bulk load status, current_status = {}, " |
| "wait and retry to set status as {}", |
| app_name, |
| pid, |
| dsn::enum_to_string(pinfo.status), |
| dsn::enum_to_string(new_status)); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::update_partition_info_on_remote_storage, |
| this, |
| app_name, |
| pid, |
| new_status, |
| should_send_request), |
| 0, |
| std::chrono::seconds(1)); |
| } else { |
| LOG_INFO("app({}) partition({}) has already sync bulk load status, current_status = " |
| "{}, new_status = {}, wait for next round", |
| app_name, |
| pid, |
| dsn::enum_to_string(pinfo.status), |
| dsn::enum_to_string(new_status)); |
| } |
| return; |
| } |
| |
| _partitions_pending_sync_flag[pid] = true; |
| update_partition_info_unlock(pid, new_status, pinfo); |
| |
| blob value = json::json_forwarder<partition_bulk_load_info>::encode(pinfo); |
| _meta_svc->get_meta_storage()->set_data( |
| get_partition_bulk_load_path(pid), |
| std::move(value), |
| std::bind(&bulk_load_service::update_partition_info_on_remote_storage_reply, |
| this, |
| app_name, |
| pid, |
| pinfo, |
| should_send_request)); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::update_partition_info_unlock(const gpid &pid, |
| bulk_load_status::type new_status, |
| /*out*/ partition_bulk_load_info &pinfo) |
| { |
| auto old_status = pinfo.status; |
| pinfo.status = new_status; |
| if (old_status != bulk_load_status::BLS_INGESTING || |
| new_status != bulk_load_status::BLS_SUCCEED || |
| _partitions_bulk_load_state.find(pid) == _partitions_bulk_load_state.end()) { |
| // no need to update other field of partition_bulk_load_info |
| return; |
| } |
| pinfo.addresses.clear(); |
| const auto &state = _partitions_bulk_load_state[pid]; |
| for (const auto &kv : state) { |
| pinfo.addresses.emplace_back(kv.first); |
| } |
| pinfo.ever_ingest_succeed = true; |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::update_partition_info_on_remote_storage_reply( |
| const std::string &app_name, |
| const gpid &pid, |
| const partition_bulk_load_info &new_info, |
| bool should_send_request) |
| { |
| { |
| zauto_write_lock l(_lock); |
| auto old_status = _partition_bulk_load_info[pid].status; |
| auto new_status = new_info.status; |
| _partition_bulk_load_info[pid] = new_info; |
| _partitions_pending_sync_flag[pid] = false; |
| |
| LOG_INFO("app({}) update partition({}) status from {} to {}", |
| app_name, |
| pid, |
| dsn::enum_to_string(old_status), |
| dsn::enum_to_string(new_status)); |
| |
| switch (new_status) { |
| case bulk_load_status::BLS_DOWNLOADED: |
| case bulk_load_status::BLS_INGESTING: |
| case bulk_load_status::BLS_SUCCEED: |
| case bulk_load_status::BLS_PAUSED: |
| if (old_status != new_status && !_apps_rolling_back[pid.get_app_id()] && |
| --_apps_in_progress_count[pid.get_app_id()] == 0) { |
| update_app_status_on_remote_storage_unlocked(pid.get_app_id(), new_status); |
| } |
| break; |
| case bulk_load_status::BLS_DOWNLOADING: { |
| _partitions_bulk_load_state.erase(pid); |
| _partitions_total_download_progress[pid] = 0; |
| _partitions_cleaned_up[pid] = false; |
| |
| if (--_apps_in_progress_count[pid.get_app_id()] == 0) { |
| _apps_in_progress_count[pid.get_app_id()] = |
| _app_bulk_load_info[pid.get_app_id()].partition_count; |
| _apps_rolling_back[pid.get_app_id()] = false; |
| LOG_INFO("app({}) restart to bulk load", app_name); |
| } |
| } break; |
| default: |
| // do nothing in other status |
| break; |
| } |
| } |
| if (should_send_request) { |
| partition_bulk_load(app_name, pid); |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::update_app_status_on_remote_storage_unlocked( |
| int32_t app_id, bulk_load_status::type new_status, error_code err, bool should_send_request) |
| { |
| FAIL_POINT_INJECT_F("meta_update_app_status_on_remote_storage_unlocked", |
| [](absl::string_view) {}); |
| |
| app_bulk_load_info ainfo = _app_bulk_load_info[app_id]; |
| auto old_status = ainfo.status; |
| |
| if (old_status == new_status && new_status != bulk_load_status::BLS_DOWNLOADING) { |
| LOG_WARNING("app({}) old status:{} VS new status:{}, ignore it", |
| ainfo.app_name, |
| dsn::enum_to_string(old_status), |
| dsn::enum_to_string(new_status)); |
| return; |
| } |
| |
| if (_apps_pending_sync_flag[app_id]) { |
| LOG_INFO("app({}) has already sync bulk load status, wait and retry, current status = {}, " |
| "new status = {}", |
| ainfo.app_name, |
| dsn::enum_to_string(old_status), |
| dsn::enum_to_string(new_status)); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::update_app_status_on_remote_storage_unlocked, |
| this, |
| app_id, |
| new_status, |
| err, |
| should_send_request), |
| 0, |
| std::chrono::seconds(1)); |
| return; |
| } |
| |
| _apps_pending_sync_flag[app_id] = true; |
| |
| if (bulk_load_status::BLS_INGESTING == new_status) { |
| ainfo.is_ever_ingesting = true; |
| } |
| ainfo.status = new_status; |
| ainfo.bulk_load_err = err; |
| blob value = dsn::json::json_forwarder<app_bulk_load_info>::encode(ainfo); |
| |
| _meta_svc->get_meta_storage()->set_data( |
| get_app_bulk_load_path(app_id), |
| std::move(value), |
| std::bind(&bulk_load_service::update_app_status_on_remote_storage_reply, |
| this, |
| ainfo, |
| old_status, |
| new_status, |
| should_send_request)); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk_load_info &ainfo, |
| bulk_load_status::type old_status, |
| bulk_load_status::type new_status, |
| bool should_send_request) |
| { |
| int32_t app_id = ainfo.app_id; |
| int32_t partition_count = ainfo.partition_count; |
| { |
| zauto_write_lock l(_lock); |
| _app_bulk_load_info[app_id] = ainfo; |
| _apps_pending_sync_flag[app_id] = false; |
| _apps_in_progress_count[app_id] = partition_count; |
| // when rollback from ingesting, ingesting_count should be reset |
| if (old_status == bulk_load_status::BLS_INGESTING && |
| new_status == bulk_load_status::BLS_DOWNLOADING) { |
| reset_app_ingestion(app_id); |
| } |
| } |
| |
| LOG_INFO("update app({}) status from {} to {}", |
| ainfo.app_name, |
| dsn::enum_to_string(old_status), |
| dsn::enum_to_string(new_status)); |
| |
| if (new_status == bulk_load_status::BLS_INGESTING) { |
| for (auto i = 0; i < partition_count; ++i) { |
| partition_ingestion(ainfo.app_name, gpid(app_id, i)); |
| } |
| } |
| |
| if (new_status == bulk_load_status::BLS_PAUSING || |
| new_status == bulk_load_status::BLS_DOWNLOADING || |
| new_status == bulk_load_status::BLS_CANCELED || |
| new_status == bulk_load_status::BLS_FAILED) { |
| for (int i = 0; i < ainfo.partition_count; ++i) { |
| update_partition_info_on_remote_storage( |
| ainfo.app_name, gpid(app_id, i), new_status, should_send_request); |
| } |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| bool bulk_load_service::check_ever_ingestion_succeed(const partition_configuration &config, |
| const std::string &app_name, |
| const gpid &pid) |
| { |
| partition_bulk_load_info pinfo; |
| { |
| zauto_read_lock l(_lock); |
| pinfo = _partition_bulk_load_info[pid]; |
| } |
| |
| if (!pinfo.ever_ingest_succeed) { |
| return false; |
| } |
| |
| std::vector<rpc_address> current_nodes; |
| current_nodes.emplace_back(config.primary); |
| for (const auto &secondary : config.secondaries) { |
| current_nodes.emplace_back(secondary); |
| } |
| |
| std::sort(pinfo.addresses.begin(), pinfo.addresses.end()); |
| std::sort(current_nodes.begin(), current_nodes.end()); |
| if (current_nodes == pinfo.addresses) { |
| LOG_INFO("app({}) partition({}) has already executed ingestion succeed", app_name, pid); |
| update_partition_info_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED); |
| return true; |
| } |
| |
| LOG_WARNING("app({}) partition({}) configuration changed, should executed ingestion again", |
| app_name, |
| pid); |
| return false; |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::partition_ingestion(const std::string &app_name, const gpid &pid) |
| { |
| FAIL_POINT_INJECT_F("meta_bulk_load_partition_ingestion", [](absl::string_view) {}); |
| |
| auto app_status = get_app_bulk_load_status(pid.get_app_id()); |
| if (app_status != bulk_load_status::BLS_INGESTING) { |
| LOG_WARNING("app({}) current status is {}, partition({}), ignore it", |
| app_name, |
| dsn::enum_to_string(app_status), |
| pid); |
| return; |
| } |
| |
| if (is_partition_metadata_not_updated(pid)) { |
| LOG_ERROR("app({}) partition({}) doesn't have bulk load metadata, set bulk load failed", |
| app_name, |
| pid); |
| handle_bulk_load_failed(pid.get_app_id(), ERR_CORRUPTION); |
| return; |
| } |
| |
| partition_configuration pconfig; |
| if (!check_partition_status(app_name, |
| pid, |
| true, |
| std::bind(&bulk_load_service::partition_ingestion, |
| this, |
| std::placeholders::_1, |
| std::placeholders::_2), |
| pconfig)) { |
| return; |
| } |
| |
| if (check_ever_ingestion_succeed(pconfig, app_name, pid)) { |
| return; |
| } |
| |
| auto app = get_app(pid.get_app_id()); |
| if (!try_partition_ingestion(pconfig, app->helpers->contexts[pid.get_partition_index()])) { |
| LOG_WARNING( |
| "app({}) partition({}) couldn't execute ingestion, wait and try later", app_name, pid); |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid), |
| pid.thread_hash(), |
| std::chrono::seconds(5)); |
| return; |
| } |
| |
| rpc_address primary_addr = pconfig.primary; |
| ballot meta_ballot = pconfig.ballot; |
| tasking::enqueue(LPC_BULK_LOAD_INGESTION, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::send_ingestion_request, |
| this, |
| app_name, |
| pid, |
| primary_addr, |
| meta_ballot), |
| 0, |
| std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL)); |
| } |
| |
| // ThreadPool: THREAD_POOL_DEFAULT |
| void bulk_load_service::send_ingestion_request(const std::string &app_name, |
| const gpid &pid, |
| const rpc_address &primary_addr, |
| const ballot &meta_ballot) |
| { |
| ingestion_request req; |
| req.app_name = app_name; |
| req.ballot = meta_ballot; |
| req.verify_before_ingest = FLAGS_bulk_load_verify_before_ingest; |
| { |
| zauto_read_lock l(_lock); |
| req.metadata = _partition_bulk_load_info[pid].metadata; |
| req.ingest_behind = _app_bulk_load_info[pid.get_app_id()].ingest_behind; |
| } |
| // create a client request, whose gpid field in header should be pid |
| message_ex *msg = message_ex::create_request(dsn::apps::RPC_RRDB_RRDB_BULK_LOAD, |
| 0, |
| pid.thread_hash(), |
| static_cast<uint64_t>(pid.get_partition_index())); |
| auto &hdr = *msg->header; |
| hdr.gpid = pid; |
| dsn::marshall(msg, req); |
| dsn::rpc_response_task_ptr rpc_callback = rpc::create_rpc_response_task( |
| msg, |
| _meta_svc->tracker(), |
| [this, app_name, pid, primary_addr](error_code err, ingestion_response &&resp) { |
| on_partition_ingestion_reply(err, std::move(resp), app_name, pid, primary_addr); |
| }); |
| _meta_svc->send_request(msg, primary_addr, rpc_callback); |
| LOG_INFO("send ingest_request to node({}), app({}) partition({})", primary_addr, app_name, pid); |
| } |
| |
| // ThreadPool: THREAD_POOL_DEFAULT |
| void bulk_load_service::on_partition_ingestion_reply(error_code err, |
| const ingestion_response &&resp, |
| const std::string &app_name, |
| const gpid &pid, |
| const rpc_address &primary_addr) |
| { |
| if (err != ERR_OK || resp.err != ERR_OK || resp.rocksdb_error != ERR_OK) { |
| finish_ingestion(pid); |
| } |
| |
| if (err == ERR_NO_NEED_OPERATE) { |
| LOG_WARNING( |
| "app({}) partition({}) on node({}) has already executing ingestion, ignore this " |
| "repeated request", |
| app_name, |
| pid, |
| primary_addr); |
| return; |
| } |
| |
| // if meet 2pc error, ingesting will rollback to downloading, no need to retry here |
| if (err != ERR_OK) { |
| LOG_ERROR("app({}) partition({}) on node({}) ingestion files failed, error = {}", |
| app_name, |
| pid, |
| primary_addr, |
| err); |
| tasking::enqueue( |
| LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::try_rollback_to_downloading, this, app_name, pid)); |
| return; |
| } |
| |
| if (resp.err == ERR_TRY_AGAIN && resp.rocksdb_error != 0) { |
| LOG_ERROR("app({}) partition({}) on node({}) ingestion files failed while empty write, " |
| "rocksdb error = " |
| "{}, retry it later", |
| app_name, |
| pid, |
| primary_addr, |
| resp.rocksdb_error); |
| tasking::enqueue(LPC_BULK_LOAD_INGESTION, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid), |
| 0, |
| std::chrono::milliseconds(10)); |
| return; |
| } |
| |
| // some unexpected errors happened, such as write empty write failed but rocksdb_error is ok |
| // stop bulk load process with failed |
| if (resp.err != ERR_OK || resp.rocksdb_error != 0) { |
| LOG_ERROR( |
| "app({}) partition({}) on node({}) failed to ingestion files, error = {}, rocksdb " |
| "error = {}", |
| app_name, |
| pid, |
| primary_addr, |
| resp.err, |
| resp.rocksdb_error); |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::handle_bulk_load_failed, |
| this, |
| pid.get_app_id(), |
| ERR_INGESTION_FAILED)); |
| return; |
| } |
| |
| LOG_INFO("app({}) partition({}) receive ingestion response from node({}) succeed", |
| app_name, |
| pid, |
| primary_addr); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::remove_bulk_load_dir_on_remote_storage(int32_t app_id, |
| const std::string &app_name) |
| { |
| std::string bulk_load_path = get_app_bulk_load_path(app_id); |
| _meta_svc->get_meta_storage()->delete_node_recursively( |
| std::move(bulk_load_path), [this, app_id, app_name, bulk_load_path]() { |
| LOG_INFO("remove app({}) bulk load dir {} succeed", app_name, bulk_load_path); |
| reset_local_bulk_load_states(app_id, app_name, true); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptr<app_state> app, |
| bool set_app_not_bulk_loading) |
| { |
| std::string bulk_load_path = get_app_bulk_load_path(app->app_id); |
| _meta_svc->get_meta_storage()->delete_node_recursively( |
| std::move(bulk_load_path), [this, app, set_app_not_bulk_loading, bulk_load_path]() { |
| LOG_INFO("remove app({}) bulk load dir {} succeed", app->app_name, bulk_load_path); |
| reset_local_bulk_load_states(app->app_id, app->app_name, true); |
| if (set_app_not_bulk_loading) { |
| update_app_not_bulk_loading_on_remote_storage(std::move(app)); |
| } |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| template <typename T> |
| inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map<gpid, T> &mymap) |
| { |
| for (auto iter = mymap.begin(); iter != mymap.end();) { |
| if (iter->first.get_app_id() == app_id) { |
| mymap.erase(iter++); |
| } else { |
| iter++; |
| } |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::reset_local_bulk_load_states_unlocked(int32_t app_id, |
| const std::string &app_name, |
| bool is_reset_result) |
| { |
| _apps_in_progress_count.erase(app_id); |
| _apps_pending_sync_flag.erase(app_id); |
| erase_map_elem_by_id(app_id, _partitions_pending_sync_flag); |
| erase_map_elem_by_id(app_id, _partitions_total_download_progress); |
| _apps_rolling_back.erase(app_id); |
| _apps_rollback_count.erase(app_id); |
| reset_app_ingestion(app_id); |
| _bulk_load_app_id.erase(app_id); |
| |
| if (is_reset_result) { |
| _app_bulk_load_info.erase(app_id); |
| erase_map_elem_by_id(app_id, _partitions_bulk_load_state); |
| erase_map_elem_by_id(app_id, _partition_bulk_load_info); |
| erase_map_elem_by_id(app_id, _partitions_cleaned_up); |
| _apps_cleaning_up.erase(app_id); |
| } |
| |
| LOG_INFO( |
| "reset local app({}) bulk load context, is_reset_result({})", app_name, is_reset_result); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, |
| const std::string &app_name, |
| bool is_reset_result) |
| { |
| zauto_write_lock l(_lock); |
| reset_local_bulk_load_states_unlocked(app_id, app_name, is_reset_result); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::update_app_not_bulk_loading_on_remote_storage( |
| std::shared_ptr<app_state> app) |
| { |
| app_info info = *app; |
| info.__set_is_bulk_loading(false); |
| |
| blob value = dsn::json::json_forwarder<app_info>::encode(info); |
| _meta_svc->get_meta_storage()->set_data( |
| _state->get_app_path(*app), std::move(value), [app, this]() { |
| zauto_write_lock l(app_lock()); |
| app->is_bulk_loading = false; |
| LOG_INFO("app({}) update app is_bulk_loading to false", app->app_name); |
| _meta_svc->unlock_meta_op_status(); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc) |
| { |
| const std::string &app_name = rpc.request().app_name; |
| const auto &control_type = rpc.request().type; |
| auto &response = rpc.response(); |
| response.err = ERR_OK; |
| |
| std::shared_ptr<app_state> app = get_app(app_name); |
| if (app == nullptr || app->status != app_status::AS_AVAILABLE) { |
| LOG_ERROR("app({}) is not existed or not available", app_name); |
| response.err = app == nullptr ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; |
| response.__set_hint_msg(fmt::format("app({}) is not existed or not available", app_name)); |
| return; |
| } |
| |
| if (!app->is_bulk_loading) { |
| LOG_ERROR("app({}) is not executing bulk load", app_name); |
| response.err = ERR_INACTIVE_STATE; |
| response.__set_hint_msg(fmt::format("app({}) is not executing bulk load", app_name)); |
| return; |
| } |
| int32_t app_id = app->app_id; |
| |
| zauto_write_lock l(_lock); |
| const auto &app_status = get_app_bulk_load_status_unlocked(app_id); |
| switch (control_type) { |
| case bulk_load_control_type::BLC_PAUSE: { |
| if (app_status != bulk_load_status::BLS_DOWNLOADING) { |
| auto hint_msg = fmt::format("can not pause bulk load for app({}) with status({})", |
| app_name, |
| dsn::enum_to_string(app_status)); |
| LOG_ERROR("{}", hint_msg); |
| response.err = ERR_INVALID_STATE; |
| response.__set_hint_msg(hint_msg); |
| return; |
| } |
| LOG_INFO("app({}) start to pause bulk load", app_name); |
| update_app_status_on_remote_storage_unlocked(app_id, bulk_load_status::BLS_PAUSING); |
| } break; |
| case bulk_load_control_type::BLC_RESTART: { |
| if (app_status != bulk_load_status::BLS_PAUSED) { |
| auto hint_msg = fmt::format("can not restart bulk load for app({}) with status({})", |
| app_name, |
| dsn::enum_to_string(app_status)); |
| LOG_ERROR("{}", hint_msg); |
| response.err = ERR_INVALID_STATE; |
| response.__set_hint_msg(hint_msg); |
| return; |
| } |
| LOG_INFO("app({}) restart bulk load", app_name); |
| update_app_status_on_remote_storage_unlocked( |
| app_id, bulk_load_status::BLS_DOWNLOADING, ERR_OK, true); |
| } break; |
| case bulk_load_control_type::BLC_CANCEL: |
| if (app_status != bulk_load_status::BLS_DOWNLOADING && |
| app_status != bulk_load_status::BLS_PAUSED) { |
| auto hint_msg = fmt::format("can not cancel bulk load for app({}) with status({})", |
| app_name, |
| dsn::enum_to_string(app_status)); |
| LOG_ERROR("{}", hint_msg); |
| response.err = ERR_INVALID_STATE; |
| response.__set_hint_msg(hint_msg); |
| return; |
| } |
| case bulk_load_control_type::BLC_FORCE_CANCEL: { |
| LOG_INFO("app({}) start to {} cancel bulk load, original status = {}", |
| app_name, |
| control_type == bulk_load_control_type::BLC_FORCE_CANCEL ? "force" : "", |
| dsn::enum_to_string(app_status)); |
| update_app_status_on_remote_storage_unlocked(app_id, |
| bulk_load_status::BLS_CANCELED, |
| ERR_OK, |
| app_status == bulk_load_status::BLS_PAUSED); |
| } break; |
| default: |
| break; |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc) |
| { |
| const auto &request = rpc.request(); |
| const std::string &app_name = request.app_name; |
| |
| query_bulk_load_response &response = rpc.response(); |
| response.err = ERR_OK; |
| response.app_name = app_name; |
| |
| std::shared_ptr<app_state> app = get_app(app_name); |
| if (app == nullptr || app->status != app_status::AS_AVAILABLE) { |
| auto hint_msg = fmt::format("app({}) is not existed or not available", app_name); |
| LOG_ERROR("{}", hint_msg); |
| response.err = (app == nullptr) ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; |
| response.__set_hint_msg(hint_msg); |
| return; |
| } |
| |
| if (!app->is_bulk_loading) { |
| auto hint_msg = |
| fmt::format("app({}) is not during bulk load, return last time result", app_name); |
| LOG_WARNING("{}", hint_msg); |
| response.__set_hint_msg(hint_msg); |
| } |
| |
| int32_t app_id = app->app_id; |
| int32_t partition_count = app->partition_count; |
| |
| zauto_read_lock l(_lock); |
| response.max_replica_count = app->max_replica_count; |
| response.app_status = get_app_bulk_load_status_unlocked(app_id); |
| |
| response.partitions_status.resize(partition_count); |
| for (const auto &kv : _partition_bulk_load_info) { |
| if (kv.first.get_app_id() == app_id) { |
| response.partitions_status[kv.first.get_partition_index()] = kv.second.status; |
| } |
| } |
| |
| response.bulk_load_states.resize(partition_count); |
| for (const auto &kv : _partitions_bulk_load_state) { |
| if (kv.first.get_app_id() == app_id) { |
| response.bulk_load_states[kv.first.get_partition_index()] = kv.second; |
| } |
| } |
| |
| response.__set_is_bulk_loading(app->is_bulk_loading); |
| |
| if (!app->is_bulk_loading && bulk_load_status::BLS_FAILED == response.app_status) { |
| response.err = get_app_bulk_load_err_unlocked(app_id); |
| } |
| |
| LOG_INFO("query app({}) bulk_load_status({}) succeed", |
| app_name, |
| dsn::enum_to_string(response.app_status)); |
| } |
| |
| void bulk_load_service::on_clear_bulk_load(clear_bulk_load_rpc rpc) |
| { |
| const auto &request = rpc.request(); |
| const std::string &app_name = request.app_name; |
| clear_bulk_load_state_response &response = rpc.response(); |
| |
| std::shared_ptr<app_state> app = get_app(app_name); |
| if (app == nullptr || app->status != app_status::AS_AVAILABLE) { |
| response.err = (app == nullptr) ? ERR_APP_NOT_EXIST : ERR_APP_DROPPED; |
| response.hint_msg = fmt::format("app({}) is not existed or not available", app_name); |
| LOG_ERROR("{}", response.hint_msg); |
| return; |
| } |
| |
| if (app->is_bulk_loading) { |
| response.err = ERR_INVALID_STATE; |
| response.hint_msg = fmt::format("app({}) is executing bulk load", app_name); |
| LOG_ERROR("{}", response.hint_msg); |
| return; |
| } |
| |
| do_clear_app_bulk_load_result(app->app_id, rpc); |
| } |
| |
| void bulk_load_service::do_clear_app_bulk_load_result(int32_t app_id, clear_bulk_load_rpc rpc) |
| { |
| FAIL_POINT_INJECT_F("meta_do_clear_app_bulk_load_result", |
| [rpc](absl::string_view) { rpc.response().err = ERR_OK; }); |
| std::string bulk_load_path = get_app_bulk_load_path(app_id); |
| _meta_svc->get_meta_storage()->delete_node_recursively( |
| std::move(bulk_load_path), [this, app_id, bulk_load_path, rpc]() { |
| clear_bulk_load_state_response &response = rpc.response(); |
| response.err = ERR_OK; |
| response.hint_msg = |
| fmt::format("clear app({}) bulk load result succeed, remove bulk load dir succeed", |
| rpc.request().app_name); |
| reset_local_bulk_load_states(app_id, rpc.request().app_name, true); |
| LOG_INFO("{}", response.hint_msg); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void bulk_load_service::create_bulk_load_root_dir() |
| { |
| blob value = blob(); |
| std::string path = _bulk_load_root; |
| _sync_bulk_load_storage->create_node(std::move(path), std::move(value), [this]() { |
| LOG_INFO("create bulk load root({}) succeed", _bulk_load_root); |
| sync_apps_from_remote_storage(); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::sync_apps_from_remote_storage() |
| { |
| std::string path = _bulk_load_root; |
| _sync_bulk_load_storage->get_children( |
| std::move(path), [this](bool flag, const std::vector<std::string> &children) { |
| if (flag && children.size() > 0) { |
| LOG_INFO("There are {} apps need to sync bulk load status", children.size()); |
| for (const auto &elem : children) { |
| int32_t app_id = boost::lexical_cast<int32_t>(elem); |
| LOG_INFO("start to sync app({}) bulk load status", app_id); |
| do_sync_app(app_id); |
| } |
| } |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::do_sync_app(int32_t app_id) |
| { |
| std::string app_path = get_app_bulk_load_path(app_id); |
| _sync_bulk_load_storage->get_data(std::move(app_path), [this, app_id](const blob &value) { |
| app_bulk_load_info ainfo; |
| dsn::json::json_forwarder<app_bulk_load_info>::decode(value, ainfo); |
| { |
| zauto_write_lock l(_lock); |
| _bulk_load_app_id.insert(app_id); |
| _app_bulk_load_info[app_id] = ainfo; |
| } |
| sync_partitions_from_remote_storage(ainfo.app_id, ainfo.app_name); |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::sync_partitions_from_remote_storage(int32_t app_id, |
| const std::string &app_name) |
| { |
| std::string app_path = get_app_bulk_load_path(app_id); |
| _sync_bulk_load_storage->get_children( |
| std::move(app_path), |
| [this, app_path, app_id, app_name](bool flag, const std::vector<std::string> &children) { |
| LOG_INFO("app(name={},app_id={}) has {} partition bulk load info to be synced", |
| app_name, |
| app_id, |
| children.size()); |
| for (const auto &child_pidx : children) { |
| int32_t pidx = boost::lexical_cast<int32_t>(child_pidx); |
| std::string partition_path = get_partition_bulk_load_path(app_path, pidx); |
| do_sync_partition(gpid(app_id, pidx), partition_path); |
| } |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::do_sync_partition(const gpid &pid, std::string &partition_path) |
| { |
| _sync_bulk_load_storage->get_data(std::move(partition_path), [this, pid](const blob &value) { |
| partition_bulk_load_info pinfo; |
| dsn::json::json_forwarder<partition_bulk_load_info>::decode(value, pinfo); |
| { |
| zauto_write_lock l(_lock); |
| _partition_bulk_load_info[pid] = pinfo; |
| } |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void bulk_load_service::try_to_continue_bulk_load() |
| { |
| FAIL_POINT_INJECT_F("meta_try_to_continue_bulk_load", [](absl::string_view) {}); |
| zauto_read_lock l(_lock); |
| for (const auto app_id : _bulk_load_app_id) { |
| app_bulk_load_info ainfo = _app_bulk_load_info[app_id]; |
| // <partition_index, partition_bulk_load_info> |
| std::unordered_map<int32_t, partition_bulk_load_info> pinfo_map; |
| for (const auto &kv : _partition_bulk_load_info) { |
| if (kv.first.get_app_id() == app_id) { |
| pinfo_map[kv.first.get_partition_index()] = kv.second; |
| } |
| } |
| try_to_continue_app_bulk_load(ainfo, pinfo_map); |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void bulk_load_service::try_to_continue_app_bulk_load( |
| const app_bulk_load_info &ainfo, |
| const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map) |
| { |
| std::shared_ptr<app_state> app = get_app(ainfo.app_name); |
| // if app is not available, remove bulk load dir |
| if (app == nullptr || app->status != app_status::AS_AVAILABLE) { |
| LOG_ERROR( |
| "app(name={},app_id={}) is not existed or not available", ainfo.app_name, ainfo.app_id); |
| if (app == nullptr) { |
| remove_bulk_load_dir_on_remote_storage(ainfo.app_id, ainfo.app_name); |
| } else { |
| remove_bulk_load_dir_on_remote_storage(std::move(app), true); |
| } |
| return; |
| } |
| |
| // check app bulk load info |
| if (!validate_app(app->app_id, app->partition_count, app->envs, ainfo, pinfo_map.size())) { |
| remove_bulk_load_dir_on_remote_storage(std::move(app), true); |
| return; |
| } |
| |
| // index of the partition whose bulk load status is different from app's bulk load status |
| std::unordered_set<int32_t> different_status_pidx_set; |
| for (const auto &kv : pinfo_map) { |
| if (kv.second.status != ainfo.status) { |
| different_status_pidx_set.insert(kv.first); |
| } |
| } |
| |
| // check partition bulk load info |
| if (!validate_partition(ainfo, pinfo_map, different_status_pidx_set.size())) { |
| remove_bulk_load_dir_on_remote_storage(std::move(app), true); |
| return; |
| } |
| |
| tasking::enqueue(LPC_META_STATE_NORMAL, |
| _meta_svc->tracker(), |
| std::bind(&bulk_load_service::do_continue_app_bulk_load, |
| this, |
| ainfo, |
| pinfo_map, |
| different_status_pidx_set)); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| /*static*/ bool |
| bulk_load_service::validate_ingest_behind(const std::map<std::string, std::string> &envs, |
| bool ingest_behind) |
| { |
| bool app_allow_ingest_behind = false; |
| const auto &iter = envs.find(replica_envs::ROCKSDB_ALLOW_INGEST_BEHIND); |
| if (iter != envs.end()) { |
| if (!buf2bool(iter->second, app_allow_ingest_behind)) { |
| LOG_WARNING("can not convert {} to bool", iter->second); |
| app_allow_ingest_behind = false; |
| } |
| } |
| if (ingest_behind && !app_allow_ingest_behind) { |
| return false; |
| } |
| return true; |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| /*static*/ bool bulk_load_service::validate_app(int32_t app_id, |
| int32_t partition_count, |
| const std::map<std::string, std::string> &envs, |
| const app_bulk_load_info &ainfo, |
| int32_t pinfo_count) |
| { |
| // app id and partition from `app_bulk_load_info` is inconsistent with current app_info |
| if (app_id != ainfo.app_id || partition_count != ainfo.partition_count) { |
| LOG_ERROR("app({}) has different app_id or partition_count, bulk load app_id = {}, " |
| "partition_count = {}, current app_id = {}, partition_count = {}", |
| ainfo.app_name, |
| ainfo.app_id, |
| ainfo.partition_count, |
| app_id, |
| partition_count); |
| return false; |
| } |
| |
| // partition_bulk_load_info count should not be greater than partition_count |
| if (partition_count < pinfo_count) { |
| LOG_ERROR("app({}) has invalid count, app partition_count = {}, remote " |
| "partition_bulk_load_info count = {}", |
| ainfo.app_name, |
| partition_count, |
| pinfo_count); |
| return false; |
| } |
| |
| // partition_bulk_load_info count is not equal to partition_count can only be happended when app |
| // status is downloading, consider the following condition: |
| // when starting bulk load, meta server will create app_bulk_load_dir and |
| // partition_bulk_load_dir on remote storage |
| // however, meta server crash when create app directory and part of partition directory |
| // when meta server recover, partition directory count is less than partition_count |
| if (pinfo_count != partition_count && ainfo.status != bulk_load_status::BLS_DOWNLOADING) { |
| LOG_ERROR("app({}) bulk_load_status = {}, but there are {} partitions lack " |
| "partition_bulk_load dir", |
| ainfo.app_name, |
| dsn::enum_to_string(ainfo.status), |
| partition_count - pinfo_count); |
| return false; |
| } |
| |
| if (!validate_ingest_behind(envs, ainfo.ingest_behind)) { |
| LOG_ERROR("app({}) has inconsistent ingest_behind option", ainfo.app_name); |
| return false; |
| } |
| |
| return true; |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| /*static*/ bool bulk_load_service::validate_partition( |
| const app_bulk_load_info &ainfo, |
| const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map, |
| const int32_t different_status_count) |
| { |
| const auto app_status = ainfo.status; |
| bool is_valid = true; |
| |
| switch (app_status) { |
| case bulk_load_status::BLS_DOWNLOADING: |
| // if app status is downloading, partition status has no limit, because when bulk load meet |
| // recoverable errors, will rollback to downloading |
| // partition directory count is allowed less than partition_count, but it is impossible with |
| // some partition bulk load status is not downloading and some partition directroy is |
| // missing on remote storage |
| if (ainfo.partition_count - pinfo_map.size() > 0 && different_status_count > 0) { |
| LOG_ERROR("app({}) bulk_load_status = {}, there are {} partitions status is different " |
| "from app, and {} partitions not existed, this is invalid", |
| ainfo.app_name, |
| dsn::enum_to_string(app_status), |
| different_status_count, |
| ainfo.partition_count - pinfo_map.size()); |
| is_valid = false; |
| } |
| break; |
| case bulk_load_status::BLS_DOWNLOADED: |
| case bulk_load_status::BLS_INGESTING: { |
| // if app status is downloaded, valid partition status is downloaded or ingesting |
| // if app status is ingesting, valid partition status is ingesting or succeed |
| const auto other_valid_status = (app_status == bulk_load_status::BLS_DOWNLOADED) |
| ? bulk_load_status::BLS_INGESTING |
| : bulk_load_status::BLS_SUCCEED; |
| for (const auto &kv : pinfo_map) { |
| if (kv.second.status != app_status && kv.second.status != other_valid_status) { |
| LOG_ERROR("app({}) bulk_load_status = {}, but partition[{}] bulk_load_status = {}, " |
| "only {} and {} is valid", |
| ainfo.app_name, |
| app_status, |
| kv.first, |
| dsn::enum_to_string(kv.second.status), |
| dsn::enum_to_string(app_status), |
| dsn::enum_to_string(other_valid_status)); |
| is_valid = false; |
| break; |
| } |
| } |
| } break; |
| case bulk_load_status::BLS_SUCCEED: |
| case bulk_load_status::BLS_PAUSED: |
| // if app status is succeed or paused, all partitions' status should not be different from |
| // app's |
| if (different_status_count > 0) { |
| LOG_ERROR("app({}) bulk_load_status = {}, {} partitions status is different from app, " |
| "this is invalid", |
| ainfo.app_name, |
| dsn::enum_to_string(app_status), |
| different_status_count); |
| is_valid = false; |
| } |
| break; |
| default: |
| // for other status, partition status has no limit |
| break; |
| } |
| |
| return is_valid; |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::do_continue_app_bulk_load( |
| const app_bulk_load_info &ainfo, |
| const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map, |
| const std::unordered_set<int32_t> &different_status_pidx_set) |
| { |
| const int32_t app_id = ainfo.app_id; |
| const int32_t partition_count = ainfo.partition_count; |
| const auto app_status = ainfo.status; |
| const int32_t different_count = different_status_pidx_set.size(); |
| const int32_t same_count = pinfo_map.size() - different_count; |
| const int32_t invalid_count = partition_count - pinfo_map.size(); |
| |
| if (!FLAGS_enable_concurrent_bulk_load && |
| !_meta_svc->try_lock_meta_op_status(meta_op_status::BULKLOAD)) { |
| LOG_ERROR("fatal, the op status of meta server must be meta_op_status::FREE"); |
| return; |
| } |
| LOG_INFO( |
| "app({}) continue bulk load, app_id = {}, partition_count = {}, status = {}, there are {} " |
| "partitions have bulk_load_info, {} partitions have same status with app, {} " |
| "partitions different", |
| ainfo.app_name, |
| app_id, |
| partition_count, |
| dsn::enum_to_string(app_status), |
| pinfo_map.size(), |
| same_count, |
| different_count); |
| |
| // _apps_in_progress_count is used for updating app bulk load, when _apps_in_progress_count = 0 |
| // means app bulk load status can transfer to next stage, for example, when app status is |
| // downloaded, and _apps_in_progress_count = 0, app status can turn to ingesting |
| // see more in function `update_partition_info_on_remote_storage_reply` |
| int32_t in_progress_partition_count = partition_count; |
| if (app_status == bulk_load_status::BLS_DOWNLOADING) { |
| if (invalid_count > 0) { |
| // create missing partition, so the in_progress_count should be invalid_count |
| in_progress_partition_count = invalid_count; |
| } else if (different_count > 0) { |
| // it is hard to distinguish that bulk load is normal downloading or rollback to |
| // downloading before meta server crash, when app status is downloading, we consider |
| // bulk load as rolling back to downloading for convenience, for partitions whose status |
| // is not downloading, update them to downloading, so the in_progress_count should be |
| // different_count |
| in_progress_partition_count = different_count; |
| } |
| } else if (app_status == bulk_load_status::BLS_DOWNLOADED || |
| app_status == bulk_load_status::BLS_INGESTING || |
| app_status == bulk_load_status::BLS_SUCCEED) { |
| // for app status is downloaded, when all partitions turn to ingesting, app partition will |
| // turn to ingesting, so the in_progress_count should be same_count, ingesting and succeed |
| // are same |
| in_progress_partition_count = same_count; |
| } // for other cases, in_progress_count should be partition_count |
| { |
| zauto_write_lock l(_lock); |
| _apps_in_progress_count[app_id] = in_progress_partition_count; |
| _apps_rollback_count[app_id] = 0; |
| } |
| |
| // if app is paused, no need to send bulk_load_request, just return |
| if (app_status == bulk_load_status::BLS_PAUSED) { |
| return; |
| } |
| |
| // create all missing partitions then send request to all partitions |
| if (app_status == bulk_load_status::BLS_DOWNLOADING && invalid_count > 0) { |
| for (auto i = 0; i < partition_count; ++i) { |
| if (pinfo_map.find(i) == pinfo_map.end()) { |
| create_missing_partition_dir(ainfo.app_name, gpid(app_id, i), partition_count); |
| } |
| } |
| return; |
| } |
| |
| // update all partition status to app_status |
| if ((app_status == bulk_load_status::BLS_FAILED || |
| app_status == bulk_load_status::BLS_CANCELED || |
| app_status == bulk_load_status::BLS_PAUSING || |
| app_status == bulk_load_status::BLS_DOWNLOADING) && |
| different_count > 0) { |
| for (auto pidx : different_status_pidx_set) { |
| update_partition_info_on_remote_storage(ainfo.app_name, gpid(app_id, pidx), app_status); |
| } |
| } |
| |
| // send bulk_load_request to all partitions |
| for (auto i = 0; i < partition_count; ++i) { |
| gpid pid = gpid(app_id, i); |
| partition_bulk_load(ainfo.app_name, pid); |
| if (app_status == bulk_load_status::BLS_INGESTING) { |
| partition_ingestion(ainfo.app_name, pid); |
| } |
| } |
| } |
| |
| // ThreadPool: THREAD_POOL_META_STATE |
| void bulk_load_service::create_missing_partition_dir(const std::string &app_name, |
| const gpid &pid, |
| int32_t partition_count) |
| { |
| partition_bulk_load_info pinfo; |
| pinfo.status = bulk_load_status::BLS_DOWNLOADING; |
| blob value = dsn::json::json_forwarder<partition_bulk_load_info>::encode(pinfo); |
| |
| _meta_svc->get_meta_storage()->create_node( |
| get_partition_bulk_load_path(pid), |
| std::move(value), |
| [app_name, pid, partition_count, pinfo, this]() { |
| const int32_t app_id = pid.get_app_id(); |
| bool send_request = false; |
| LOG_INFO("app({}) create partition({}) bulk_load_info", app_name, pid); |
| { |
| zauto_write_lock l(_lock); |
| _partition_bulk_load_info[pid] = pinfo; |
| |
| if (--_apps_in_progress_count[app_id] == 0) { |
| _apps_in_progress_count[app_id] = partition_count; |
| send_request = true; |
| } |
| } |
| if (send_request) { |
| LOG_INFO("app({}) start to bulk load", app_name); |
| for (auto i = 0; i < partition_count; ++i) { |
| partition_bulk_load(app_name, gpid(app_id, i)); |
| } |
| } |
| }); |
| } |
| |
| // ThreadPool: THREAD_POOL_META_SERVER |
| void bulk_load_service::check_app_bulk_load_states(std::shared_ptr<app_state> app, |
| bool is_app_bulk_loading) |
| { |
| std::string app_path = get_app_bulk_load_path(app->app_id); |
| _meta_svc->get_remote_storage()->node_exist( |
| app_path, LPC_META_CALLBACK, [this, app_path, app, is_app_bulk_loading](error_code err) { |
| if (err != ERR_OK && err != ERR_OBJECT_NOT_FOUND) { |
| LOG_WARNING("check app({}) bulk load dir({}) failed, error = {}, try later", |
| app->app_name, |
| app_path, |
| err); |
| tasking::enqueue(LPC_META_CALLBACK, |
| nullptr, |
| std::bind(&bulk_load_service::check_app_bulk_load_states, |
| this, |
| app, |
| is_app_bulk_loading), |
| 0, |
| std::chrono::seconds(1)); |
| return; |
| } |
| |
| if (err == ERR_OBJECT_NOT_FOUND && is_app_bulk_loading) { |
| LOG_ERROR("app({}): bulk load dir({}) not exist, but is_bulk_loading = {}, reset " |
| "app is_bulk_loading flag", |
| app->app_name, |
| app_path, |
| is_app_bulk_loading); |
| update_app_not_bulk_loading_on_remote_storage(std::move(app)); |
| return; |
| } |
| |
| // Normal cases: |
| // err = ERR_OBJECT_NOT_FOUND, is_app_bulk_load = false: app is not executing bulk load |
| // err = ERR_OK, is_app_bulk_load = true: app used to be executing bulk load |
| }); |
| } |
| |
| } // namespace replication |
| } // namespace dsn |