blob: f767ad2be3625f2285cd3aeb2e6f0afb868e3cf1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <gtest/gtest_prod.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <functional>
#include <iomanip> // std::setfill, std::setw
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <vector>
#include "backup_types.h"
#include "common/gpid.h"
#include "common/json_helper.h"
#include "common/replication_other_types.h"
#include "meta_rpc_types.h"
#include "runtime/task/task.h"
#include "runtime/task/task_tracker.h"
#include "utils/api_utilities.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/fmt_utils.h"
#include "utils/metrics.h"
#include "utils/ports.h"
#include "utils/zlocks.h"
namespace dsn {
class message_ex;
class host_port;
namespace dist {
namespace block_service {
class block_filesystem;
} // namespace block_service
} // namespace dist
namespace replication {
class backup_engine;
class backup_service;
class meta_service;
class server_state;
struct backup_info_status
{
enum type
{
ALIVE = 1, // backup info is preserved
DELETING = 2 // backup info is under deleting, should check whether backup checkpoint is
// fully removed on backup media, then remove the backup_info on remote storage
};
};
struct backup_info
{
int64_t backup_id;
int64_t start_time_ms;
int64_t end_time_ms;
// "app_ids" is copied from policy.app_ids when
// a new backup is generated. The policy's
// app set may be changed, but backup_info.app_ids
// never change.
std::set<int32_t> app_ids;
std::map<int32_t, std::string> app_names;
int32_t info_status;
backup_info_status::type get_backup_status() const
{
return backup_info_status::type(info_status);
}
backup_info()
: backup_id(0), start_time_ms(0), end_time_ms(0), info_status(backup_info_status::ALIVE)
{
}
DEFINE_JSON_SERIALIZATION(
backup_id, start_time_ms, end_time_ms, app_ids, app_names, info_status)
};
// Attention: backup_start_time == 24:00 is represent no limit for start_time, 24:00 is mainly saved
// for testing
//
// current, we don't support accurating to minute, only support accurating to hour, so
// we just set minute to 0
struct backup_start_time
{
int32_t hour; // [0 ~24)
int32_t minute; // [0 ~ 60)
backup_start_time() : hour(0), minute(0) {}
backup_start_time(int32_t h, int32_t m) : hour(h), minute(m) {}
std::string to_string() const
{
std::stringstream ss;
ss << std::setw(2) << std::setfill('0') << std::to_string(hour) << ":" << std::setw(2)
<< std::setfill('0') << std::to_string(minute);
return ss.str();
}
friend std::ostream &operator<<(std::ostream &os, const backup_start_time &t)
{
return os << t.to_string();
}
// NOTICE: this function will modify hour and minute, if time is invalid, this func will set
// hour = 24, minute = 0
bool parse_from(const std::string &time)
{
if (::sscanf(time.c_str(), "%d:%d", &hour, &minute) != 2) {
return false;
} else {
if (hour > 24) {
hour = 24;
minute = 0;
return false;
}
if (hour == 24 && minute != 0) {
minute = 0;
return false;
}
if (minute >= 60) {
hour = 24;
minute = 0;
return false;
}
}
return true;
}
// return the interval between new_hour:new_min and start_time,
// namely new_hour:new_min - start_time;
// unit is ms
int64_t compute_time_drift_ms(int32_t new_hour, int32_t new_min)
{
int64_t res = 0;
// unit is hour
res += (new_hour - hour);
// unit is minute
res *= 60;
res += (new_min - minute);
// unit is ms
return (res * 60 * 1000);
}
// judge whether we should start backup base current time
bool should_start_backup(int32_t cur_hour, int32_t cur_min)
{
if (hour == 24) {
// erase the restrict of backup_start_time, just for testing
return true;
}
// NOTICE : if you want more precisely, you can use cur_min to implement
// now, we just ignore
return (cur_hour == hour);
}
DEFINE_JSON_SERIALIZATION(hour, minute)
};
class backup_policy_metrics
{
public:
backup_policy_metrics() = default;
backup_policy_metrics(const std::string &policy_name);
const metric_entity_ptr &backup_policy_metric_entity() const;
METRIC_DEFINE_SET(backup_recent_duration_ms, int64_t)
private:
const std::string _policy_name;
const metric_entity_ptr _backup_policy_metric_entity;
METRIC_VAR_DECLARE_gauge_int64(backup_recent_duration_ms);
DISALLOW_COPY_AND_ASSIGN(backup_policy_metrics);
};
//
// the backup process of meta server:
// 1, write the app metadata to block filesystem
// 2, tell the primary of each partition periodically to start backup until app finish backup
// 3, receive the backup response from each primary to judge whether backup is finished
// 4, if one app finish its backup, write a flag to block filesystem(we write a file named
// app_backup_status to represent the flag) to represent it has finished backup
// 5, if policy finished backup, write the backup information (backup_info) to block filesystem
// 6, backup is finished, we just wait to start another backup
//
class policy : public policy_info
{
public:
std::set<int32_t> app_ids;
std::map<int32_t, std::string> app_names;
int64_t backup_interval_seconds;
int32_t backup_history_count_to_keep;
bool is_disable;
backup_start_time start_time;
policy()
: app_ids(),
backup_interval_seconds(0),
backup_history_count_to_keep(6),
is_disable(false),
start_time(24, 0) // default is 24:00, namely no limit
{
}
DEFINE_JSON_SERIALIZATION(policy_name,
backup_provider_type,
app_ids,
app_names,
backup_interval_seconds,
backup_history_count_to_keep,
is_disable,
start_time)
};
struct backup_progress
{
int32_t unfinished_apps;
std::map<gpid, int32_t> partition_progress;
std::map<app_id, int32_t> unfinished_partitions_per_app;
// <app_id, <partition_id, checkpoint size>>
std::map<app_id, std::map<int, int64_t>> app_chkpt_size;
// if app is dropped when starting a new backup or under backuping, we just skip backup this app
std::map<app_id, bool> is_app_skipped;
backup_progress() : unfinished_apps(0) {}
void reset()
{
unfinished_apps = 0;
partition_progress.clear();
unfinished_partitions_per_app.clear();
app_chkpt_size.clear();
is_app_skipped.clear();
}
};
struct backup_flag
{
int64_t total_checkpoint_size;
DEFINE_JSON_SERIALIZATION(total_checkpoint_size)
};
class policy_context
{
public:
explicit policy_context(backup_service *service)
: _backup_service(service), _block_service(nullptr)
{
}
mock_virtual ~policy_context() {}
void set_policy(const policy &p);
policy get_policy();
void add_backup_history(const backup_info &info);
std::vector<backup_info> get_backup_infos(int cnt);
bool is_under_backuping();
mock_virtual void start();
// function above will called be others, before call these function, should lock the _lock of
// policy_context, otherwise maybe lead deadlock
// clang-format off
mock_private :
//
// update the partition progress
// the progress for app and whole-backup-instance will also updated accordingly.
// if whole-backup-instance is finished, sync it to the remote storage.
// NOTICE: the local "_cur_backup" is reset after it is successfully synced to remote,
// which is in another task.
// so user can safely visit "_cur_backup" after this function call,
// as long as the _lock is held.
//
// Return: true if the partition is finished, or-else false
//
mock_virtual bool
update_partition_progress_unlocked(gpid pid, int32_t progress, const host_port &source);
mock_virtual void record_partition_checkpoint_size_unlock(const gpid& pid, int64_t size);
mock_virtual void start_backup_app_meta_unlocked(int32_t app_id);
mock_virtual void start_backup_app_partitions_unlocked(int32_t app_id);
mock_virtual void start_backup_partition_unlocked(gpid pid);
// before finish backup one app, we write a flag file to represent whether the app's backup is
// finished
mock_virtual void write_backup_app_finish_flag_unlocked(int32_t app_id,
dsn::task_ptr write_callback);
mock_virtual void finish_backup_app_unlocked(int32_t app_id);
// after finish backup all app, we record the information of policy's backup to block filesystem
mock_virtual void write_backup_info_unlocked(const backup_info &b_info,
dsn::task_ptr write_callback);
mock_virtual void sync_backup_to_remote_storage_unlocked(const backup_info &b_info,
dsn::task_ptr sync_callback,
bool create_new_node);
mock_virtual void initialize_backup_progress_unlocked();
mock_virtual void prepare_current_backup_on_new_unlocked();
mock_virtual void issue_new_backup_unlocked();
// returns:
// - true, should start backup right now, otherwise don't start backup
mock_virtual bool should_start_backup_unlocked();
mock_virtual void continue_current_backup_unlocked();
mock_virtual void on_backup_reply(dsn::error_code err,
backup_response &&response,
gpid pid,
const host_port &primary);
mock_virtual void gc_backup_info_unlocked(const backup_info &info_to_gc);
mock_virtual void issue_gc_backup_info_task_unlocked();
mock_virtual void sync_remove_backup_info(const backup_info &info, dsn::task_ptr sync_callback);
mock_private :
friend class backup_service;
backup_service *_backup_service;
// lock the data-structure below
dsn::zlock _lock;
// policy related
policy _policy;
dist::block_service::block_filesystem *_block_service;
// backup related
backup_info _cur_backup;
bool _is_backup_failed;
// backup_id --> backup_info
std::map<int64_t, backup_info> _backup_history;
backup_progress _progress;
std::string _backup_sig; // policy_name@backup_id, used when print backup related log
std::unique_ptr<backup_policy_metrics> _metrics;
//clang-format on
dsn::task_tracker _tracker;
};
class backup_service
{
public:
struct backup_opt
{
std::chrono::milliseconds meta_retry_delay_ms;
std::chrono::milliseconds block_retry_delay_ms;
std::chrono::milliseconds app_dropped_retry_delay_ms;
std::chrono::milliseconds reconfiguration_retry_delay_ms;
std::chrono::milliseconds request_backup_period_ms; // period that meta send backup command to replica
std::chrono::milliseconds issue_backup_interval_ms; // interval that meta try to issue a new backup
};
typedef std::function<std::shared_ptr<policy_context>(backup_service *)> policy_factory;
explicit backup_service(meta_service *meta_svc,
const std::string &policy_meta_root,
const std::string &backup_root,
const policy_factory &factory);
meta_service *get_meta_service() const { return _meta_svc; }
server_state *get_state() const { return _state; }
backup_opt &backup_option() { return _opt; }
void start();
const std::string &backup_root() const { return _backup_root; }
const std::string &policy_root() const { return _policy_meta_root; }
void add_backup_policy(dsn::message_ex* msg);
void query_backup_policy(query_backup_policy_rpc rpc);
void modify_backup_policy(configuration_modify_backup_policy_rpc rpc);
void start_backup_app(start_backup_app_rpc rpc);
void query_backup_status(query_backup_status_rpc rpc);
// compose the absolute path(AP) for policy
// input:
// -- root: the prefix of the AP
// return:
// the AP of this policy: <policy_meta_root>/<policy_name>
std::string get_policy_path(const std::string &policy_name);
// compose the absolute path(AP) for backup
// input:
// -- root: the prefix of the AP
// return:
// the AP of this backup: <policy_meta_root>/<policy_name>/<backup_id>
std::string get_backup_path(const std::string &policy_name, int64_t backup_id);
private:
friend class backup_service_test;
friend class meta_service_test_app;
FRIEND_TEST(backup_service_test, test_init_backup);
FRIEND_TEST(backup_service_test, test_query_backup_status);
FRIEND_TEST(meta_backup_service_test, test_add_backup_policy);
void start_create_policy_meta_root(dsn::task_ptr callback);
void start_sync_policies();
error_code sync_policies_from_remote_storage();
void do_add_policy(dsn::message_ex* req,
std::shared_ptr<policy_context> p,
const std::string &hint_msg);
void do_update_policy_to_remote_storage(configuration_modify_backup_policy_rpc rpc,
const policy &p,
std::shared_ptr<policy_context> &p_context_ptr);
bool is_valid_policy_name_unlocked(const std::string &policy_name);
policy_factory _factory;
meta_service *_meta_svc;
server_state *_state;
// lock _policy_states and _backup_states.
zlock _lock;
std::map<std::string, std::shared_ptr<policy_context>>
_policy_states; // policy_name -> policy_context
// _backup_states stores all states of one-time backup in the cluster, not persistence to ZK
std::vector<std::shared_ptr<backup_engine>> _backup_states;
// the root of policy metas, stored on remote_storage(zookeeper)
std::string _policy_meta_root;
// the root of cold backup data, stored on block service
std::string _backup_root;
backup_opt _opt;
std::atomic_bool _in_initialize;
dsn::task_tracker _tracker;
};
} // namespace replication
} // namespace dsn
USER_DEFINED_STRUCTURE_FORMATTER(::dsn::replication::backup_start_time);