blob: 359b3fa08510e4ffb767615c36cbb8e4a3683f54 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <functional>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include "bulk_load_types.h"
#include "common/bulk_load_common.h"
#include "common/gpid.h"
#include "common/json_helper.h"
#include "common/replication_other_types.h"
#include "meta/meta_state_service_utils.h"
#include "meta_bulk_load_ingestion_context.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task_tracker.h"
#include "server_state.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/zlocks.h"
namespace dsn {
class partition_configuration;
namespace replication {
class app_state;
class config_context;
class meta_service;
/// bulk load path on remote storage:
/// <cluster_root>/bulk_load/<app_id> -> app_bulk_load_info
/// <cluster_root>/bulk_load/<app_id>/<pidx> -> partition_bulk_load_info
struct app_bulk_load_info
int32_t app_id;
int32_t partition_count;
std::string app_name;
std::string cluster_name;
std::string file_provider_type;
bulk_load_status::type status;
std::string remote_root_path;
bool ingest_behind;
bool is_ever_ingesting;
error_code bulk_load_err;
struct partition_bulk_load_info
bulk_load_status::type status;
bulk_load_metadata metadata;
bool ever_ingest_succeed;
std::vector<host_port> host_ports;
DEFINE_JSON_SERIALIZATION(status, metadata, ever_ingest_succeed, host_ports)
// Used for remote file provider
struct bulk_load_info
int32_t app_id;
std::string app_name;
int32_t partition_count;
bulk_load_info(int32_t id = 0, const std::string &name = "", int32_t pcount = 0)
: app_id(id), app_name(name), partition_count(pcount)
DEFINE_JSON_SERIALIZATION(app_id, app_name, partition_count)
/// Bulk load process:
/// when client sent `start_bulk_load_rpc` to meta server to start bulk load,
/// meta server create bulk load structures on remote storage, and send `RPC_BULK_LOAD` rpc to
/// each primary replica periodically until bulk load succeed or failed. whole process below:
/// start bulk load
/// |
/// v
/// remove previous bulk load info on remote storage
/// |
/// v
/// is_bulk_loading = true
/// |
/// v
/// create bulk load info on remote storage
/// |
/// Err v
/// ---------Downloading <---------|
/// | Too many | |
/// | rollback | |
/// | v Err |
/// | Downloaded --------->|
/// | | |
/// | IngestErr v Err |
/// |<------- Ingesting --------->|
/// | |
/// v v
/// Failed Succeed
/// | |
/// | v
/// |---> is_bulk_loading = false
/// |
/// v
/// bulk load end
class bulk_load_service
explicit bulk_load_service(meta_service *meta_svc, const std::string &bulk_load_dir);
void initialize_bulk_load_service();
// client -> meta server to start bulk load
void on_start_bulk_load(start_bulk_load_rpc rpc);
// client -> meta server to pause/restart/cancel/force_cancel bulk load
void on_control_bulk_load(control_bulk_load_rpc rpc);
// client -> meta server to query bulk load status
void on_query_bulk_load_status(query_bulk_load_rpc rpc);
// client -> meta server to clear bulk load state
void on_clear_bulk_load(clear_bulk_load_rpc rpc);
// Called by `sync_apps_from_remote_storage`, check bulk load state consistency
// Handle inconsistent conditions below:
// - app is_bulk_loading = true, app_bulk_load_info not existed, set is_bulk_loading=false
// - app is_bulk_loading = false, app_bulk_load_info existed, remove useless app bulk load on
// remote storage
void check_app_bulk_load_states(std::shared_ptr<app_state> app, bool is_app_bulk_loading);
// Called by `on_start_bulk_load`, check request params
// - ERR_OK: pass params check
// - ERR_INVALID_PARAMETERS: wrong file_provider type
// - ERR_FILE_OPERATION_FAILED: file_provider error
// - ERR_OBJECT_NOT_FOUND: bulk_load_info not exist, may wrong cluster_name or app_name
// - ERR_CORRUPTION: bulk_load_info is damaged on file_provider
// - ERR_INCONSISTENT_STATE: app_id or partition_count inconsistent
error_code check_bulk_load_request_params(const start_bulk_load_request &request,
const int32_t app_id,
const int32_t partition_count,
const std::map<std::string, std::string> &envs,
std::string &hint_msg);
void do_start_app_bulk_load(std::shared_ptr<app_state> app, start_bulk_load_rpc rpc);
void do_clear_app_bulk_load_result(int32_t app_id, clear_bulk_load_rpc rpc);
// Called by `partition_bulk_load` and `partition_ingestion`
// check partition status before sending partition_bulk_load_request and
// partition_ingestion_request
bool check_partition_status(
const std::string &app_name,
const gpid &pid,
bool always_unhealthy_check,
const std::function<void(const std::string &, const gpid &)> &retry_function,
/*out*/ partition_configuration &pconfig);
void partition_bulk_load(const std::string &app_name, const gpid &pid);
void on_partition_bulk_load_reply(error_code err,
const bulk_load_request &request,
const bulk_load_response &response);
// if app is still in bulk load, resend bulk_load_request to primary after interval seconds
void try_resend_bulk_load_request(const std::string &app_name, const gpid &pid);
void handle_app_downloading(const bulk_load_response &response, const host_port &primary_addr);
void handle_app_ingestion(const bulk_load_response &response, const host_port &primary_addr);
// when app status is `succeed, `failed`, `canceled`, meta and replica should cleanup bulk load
// states
void handle_bulk_load_finish(const bulk_load_response &response, const host_port &primary_addr);
void handle_app_pausing(const bulk_load_response &response, const host_port &primary_addr);
// app not existed or not available during bulk load
void handle_app_unavailable(int32_t app_id, const std::string &app_name);
void try_rollback_to_downloading(const std::string &app_name, const gpid &pid);
void handle_bulk_load_failed(int32_t app_id, error_code err);
// Called when app bulk load status update to ingesting
// create ingestion_request and send it to primary
void partition_ingestion(const std::string &app_name, const gpid &pid);
void send_ingestion_request(const std::string &app_name,
const gpid &pid,
const host_port &primary_addr,
const ballot &meta_ballot);
void on_partition_ingestion_reply(error_code err,
const ingestion_response &&resp,
const std::string &app_name,
const gpid &pid,
const host_port &primary_addr);
// Called by `partition_ingestion`
// - true : this partition has ever executed ingestion succeed, no need to send ingestion
// request
// - false: this partition has not executed ingestion or executed ingestion failed
bool check_ever_ingestion_succeed(const partition_configuration &config,
const std::string &app_name,
const gpid &pid);
// is_reset_all
// - true : reset all states in memory
// - false : keep the bulk load results in memory, reset others
void reset_local_bulk_load_states_unlocked(int32_t app_id,
const std::string &app_name,
bool is_reset_all);
reset_local_bulk_load_states(int32_t app_id, const std::string &app_name, bool is_reset_all);
/// ingestion_context functions
bool try_partition_ingestion(const partition_configuration &config, const config_context &cc)
return _ingestion_context->try_partition_ingestion(config, cc);
void finish_ingestion(const gpid &pid) { _ingestion_context->remove_partition(pid); }
const int32_t get_app_ingesting_count(const int32_t app_id) const
return _ingestion_context->get_app_ingesting_count(app_id);
void reset_app_ingestion(const int32_t app_id) { _ingestion_context->reset_app(app_id); }
/// update bulk load states to remote storage functions
void create_app_bulk_load_dir(const std::string &app_name,
int32_t app_id,
int32_t partition_count,
start_bulk_load_rpc rpc);
void create_partition_bulk_load_dir(const std::string &app_name,
const gpid &pid,
int32_t partition_count,
start_bulk_load_rpc rpc);
// Called by `handle_app_downloading`
// update partition bulk load metadata reported by replica server on remote storage
void update_partition_metadata_on_remote_storage(const std::string &app_name,
const gpid &pid,
const bulk_load_metadata &metadata);
// update partition bulk load info on remote storage
// if should_send_request = true, will send bulk load request after update local partition
// status, this parameter will be true when restarting bulk load, status will turn from paused
// to downloading
void update_partition_info_on_remote_storage(const std::string &app_name,
const gpid &pid,
bulk_load_status::type new_status,
bool should_send_request = false);
void update_partition_info_unlock(const gpid &pid,
bulk_load_status::type new_status,
/*out*/ partition_bulk_load_info &pinfo);
void update_partition_info_on_remote_storage_reply(const std::string &app_name,
const gpid &pid,
const partition_bulk_load_info &new_info,
bool should_send_request);
// update app bulk load status on remote storage
void update_app_status_on_remote_storage_unlocked(int32_t app_id,
bulk_load_status::type new_status,
error_code err = ERR_OK,
bool should_send_request = false);
void update_app_status_on_remote_storage_reply(const app_bulk_load_info &ainfo,
bulk_load_status::type old_status,
bulk_load_status::type new_status,
bool should_send_request);
// called when app is not available or dropped during bulk load, remove bulk load directory on
// remote storage
void remove_bulk_load_dir_on_remote_storage(int32_t app_id, const std::string &app_name);
// called when app is available, remove bulk load directory on remote storage
// if `set_app_not_bulk_loading` = true: call function
// `update_app_not_bulk_loading_on_remote_storage` to set app not bulk_loading after removing
void remove_bulk_load_dir_on_remote_storage(std::shared_ptr<app_state> app,
bool set_app_not_bulk_loading);
// update app's is_bulk_loading to false on remote_storage
void update_app_not_bulk_loading_on_remote_storage(std::shared_ptr<app_state> app);
/// sync bulk load states from remote storage
/// called when service initialized or meta server leader switch
void create_bulk_load_root_dir();
void sync_apps_from_remote_storage();
void do_sync_app(int32_t app_id);
void sync_partitions_from_remote_storage(int32_t app_id, const std::string &app_name);
void do_sync_partition(const gpid &pid, std::string &partition_path);
/// try to continue bulk load according to states from remote storage
/// called when service initialized or meta server leader switch
void try_to_continue_bulk_load();
void try_to_continue_app_bulk_load(
const app_bulk_load_info &ainfo,
const std::unordered_map<int32_t, partition_bulk_load_info> &partition_map);
static bool validate_ingest_behind(const std::map<std::string, std::string> &envs,
bool ingest_behind);
static bool validate_app(int32_t app_id,
int32_t partition_count,
const std::map<std::string, std::string> &envs,
const app_bulk_load_info &ainfo,
int32_t pinfo_count);
static bool
validate_partition(const app_bulk_load_info &ainfo,
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map,
const int32_t different_status_count);
void do_continue_app_bulk_load(
const app_bulk_load_info &ainfo,
const std::unordered_map<int32_t, partition_bulk_load_info> &pinfo_map,
const std::unordered_set<int32_t> &different_status_pidx_set);
// called by `do_continue_app_bulk_load`
// only used when app status is downloading and some partition bulk load info not existed on
// remote storage
void create_missing_partition_dir(const std::string &app_name,
const gpid &pid,
int32_t partition_count);
/// helper functions
inline std::shared_ptr<app_state> get_app(const std::string &name)
zauto_read_lock l(app_lock());
return _state->get_app(name);
inline std::shared_ptr<app_state> get_app(int32_t app_id)
zauto_read_lock l(app_lock());
return _state->get_app(app_id);
// get bulk_load_info path on file provider
// <remote_root_path>/<cluster_name>/<app_name>/bulk_load_info
inline std::string get_bulk_load_info_path(const std::string &app_name,
const std::string &cluster_name,
const std::string &remote_root_path) const
std::ostringstream oss;
oss << remote_root_path << "/" << cluster_name << "/" << app_name << "/"
<< bulk_load_constant::BULK_LOAD_INFO;
return oss.str();
// get app_bulk_load_info path on remote storage
// <_bulk_load_root>/<app_id>
inline std::string get_app_bulk_load_path(int32_t app_id) const
std::stringstream oss;
oss << _bulk_load_root << "/" << app_id;
return oss.str();
// get partition_bulk_load_info path on remote storage
// <_bulk_load_root>/<app_id>/<partition_id>
inline std::string get_partition_bulk_load_path(const std::string &app_bulk_load_path,
int partition_id) const
std::stringstream oss;
oss << app_bulk_load_path << "/" << partition_id;
return oss.str();
inline std::string get_partition_bulk_load_path(const gpid &pid) const
std::stringstream oss;
oss << get_app_bulk_load_path(pid.get_app_id()) << "/" << pid.get_partition_index();
return oss.str();
inline bool is_partition_metadata_not_updated(gpid pid)
zauto_read_lock l(_lock);
return is_partition_metadata_not_updated_unlocked(pid);
inline bool is_partition_metadata_not_updated_unlocked(gpid pid) const
const auto &iter = _partition_bulk_load_info.find(pid);
if (iter == _partition_bulk_load_info.end()) {
return false;
const auto &metadata = iter->second.metadata;
return (metadata.files.size() == 0 && metadata.file_total_size == 0);
inline bulk_load_status::type get_partition_bulk_load_status_unlocked(gpid pid) const
const auto &iter = _partition_bulk_load_info.find(pid);
if (iter != _partition_bulk_load_info.end()) {
return iter->second.status;
} else {
return bulk_load_status::BLS_INVALID;
inline bulk_load_status::type get_app_bulk_load_status(int32_t app_id)
zauto_read_lock l(_lock);
return get_app_bulk_load_status_unlocked(app_id);
inline bulk_load_status::type get_app_bulk_load_status_unlocked(int32_t app_id) const
const auto &iter = _app_bulk_load_info.find(app_id);
if (iter != _app_bulk_load_info.end()) {
return iter->second.status;
} else {
return bulk_load_status::BLS_INVALID;
inline error_code get_app_bulk_load_err_unlocked(int32_t app_id) const
const auto &iter = _app_bulk_load_info.find(app_id);
if (iter != _app_bulk_load_info.end()) {
return iter->second.bulk_load_err;
} else {
return ERR_OK;
inline bool is_app_bulk_loading_unlocked(int32_t app_id) const
return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end());
friend class bulk_load_service_test;
friend class meta_bulk_load_http_test;
meta_service *_meta_svc;
server_state *_state;
std::unique_ptr<mss::meta_storage> _sync_bulk_load_storage;
std::unique_ptr<ingestion_context> _ingestion_context;
task_tracker _sync_tracker;
zrwlock_nr &app_lock() const { return _state->_lock; }
zrwlock_nr _lock; // bulk load states lock
const std::string _bulk_load_root; // <cluster_root>/bulk_load
/// bulk load states
std::unordered_set<int32_t> _bulk_load_app_id;
std::unordered_map<app_id, app_bulk_load_info> _app_bulk_load_info;
std::unordered_map<app_id, int32_t> _apps_in_progress_count;
std::unordered_map<app_id, bool> _apps_pending_sync_flag;
std::unordered_map<gpid, partition_bulk_load_info> _partition_bulk_load_info;
std::unordered_map<gpid, bool> _partitions_pending_sync_flag;
// partition_index -> group total download progress
std::unordered_map<gpid, int32_t> _partitions_total_download_progress;
// partition_index -> group bulk load states(node address -> state)
std::unordered_map<gpid, std::map<host_port, partition_bulk_load_state>>
std::unordered_map<gpid, bool> _partitions_cleaned_up;
// Used for bulk load failed and app unavailable to avoid duplicated clean up
std::unordered_map<app_id, bool> _apps_cleaning_up;
// Used for bulk load rolling back to downloading
std::unordered_map<app_id, bool> _apps_rolling_back;
// Used for restrict bulk load rollback count
std::unordered_map<app_id, int32_t> _apps_rollback_count;
} // namespace replication
} // namespace dsn