fix(backup): validate backup policy name (#2029)

Because the backup policy name will be used as a metric name and label, it
should not contain any invalid character (e.g. `-`), otherwise, it lead crash.

This patch adds a validator to ensure there are only allowed characters in
backup policy name.
diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt
index 95ecb3b..eb5ff8d 100644
--- a/src/meta/CMakeLists.txt
+++ b/src/meta/CMakeLists.txt
@@ -39,6 +39,7 @@
         dsn_http
         dsn_runtime
         dsn_aio
+        prometheus-cpp-core
         zookeeper
         hashtable
         hdfs
diff --git a/src/meta/meta_backup_service.cpp b/src/meta/meta_backup_service.cpp
index 5f8d04b..84b9cd7 100644
--- a/src/meta/meta_backup_service.cpp
+++ b/src/meta/meta_backup_service.cpp
@@ -19,6 +19,7 @@
 #include <boost/cstdint.hpp>
 #include <boost/lexical_cast.hpp>
 #include <fmt/core.h>
+#include <prometheus/check_names.h>
 #include <algorithm>
 #include <iterator>
 #include <type_traits>
@@ -41,9 +42,9 @@
 #include "runtime/rpc/rpc_holder.h"
 #include "runtime/rpc/rpc_message.h"
 #include "runtime/rpc/serialization.h"
-#include "security/access_controller.h"
 #include "runtime/task/async_calls.h"
 #include "runtime/task/task_code.h"
+#include "security/access_controller.h"
 #include "server_state.h"
 #include "utils/autoref_ptr.h"
 #include "utils/blob.h"
@@ -1349,9 +1350,10 @@
     {
         // check policy name
         zauto_lock l(_lock);
-        if (!is_valid_policy_name_unlocked(request.policy_name)) {
+        if (!is_valid_policy_name_unlocked(request.policy_name, hint_message)) {
             response.err = ERR_INVALID_PARAMETERS;
-            response.hint_message = "invalid policy_name: " + request.policy_name;
+            response.hint_message =
+                fmt::format("invalid policy name: '{}', {}", request.policy_name, hint_message);
             _meta_svc->reply_data(msg, response);
             msg->release_ref();
             return;
@@ -1459,10 +1461,40 @@
         });
 }
 
-bool backup_service::is_valid_policy_name_unlocked(const std::string &policy_name)
+bool backup_service::is_valid_policy_name_unlocked(const std::string &policy_name,
+                                                   std::string &hint_message)
 {
-    auto iter = _policy_states.find(policy_name);
-    return (iter == _policy_states.end());
+    // BACKUP_INFO and policy_name should not be the same, because they are in the same level in the
+    // output when query the policy details, use different names to distinguish the respective
+    // contents.
+    static const std::set<std::string> kReservedNames = {cold_backup_constant::BACKUP_INFO};
+    if (kReservedNames.count(policy_name) == 1) {
+        hint_message = "policy name is reserved";
+        return false;
+    }
+
+    // Validate the policy name as a metric name in prometheus.
+    if (!prometheus::CheckMetricName(policy_name)) {
+        hint_message = "policy name should match regex '[a-zA-Z_:][a-zA-Z0-9_:]*' when act as a "
+                       "metric name in prometheus";
+        return false;
+    }
+
+    // Validate the policy name as a metric label in prometheus.
+    if (!prometheus::CheckLabelName(policy_name, prometheus::MetricType::Gauge)) {
+        hint_message = "policy name should match regex '[a-zA-Z_][a-zA-Z0-9_]*' when act as a "
+                       "metric label in prometheus";
+        return false;
+    }
+
+    const auto iter = _policy_states.find(policy_name);
+    if (iter != _policy_states.end()) {
+        hint_message = "policy name is already exist";
+        return false;
+    }
+
+    hint_message.clear();
+    return true;
 }
 
 void backup_service::query_backup_policy(query_backup_policy_rpc rpc)
diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h
index f767ad2..d3bf9fa 100644
--- a/src/meta/meta_backup_service.h
+++ b/src/meta/meta_backup_service.h
@@ -407,6 +407,7 @@
 
     FRIEND_TEST(backup_service_test, test_init_backup);
     FRIEND_TEST(backup_service_test, test_query_backup_status);
+    FRIEND_TEST(backup_service_test, test_valid_policy_name);
     FRIEND_TEST(meta_backup_service_test, test_add_backup_policy);
 
     void start_create_policy_meta_root(dsn::task_ptr callback);
@@ -420,7 +421,7 @@
                                             const policy &p,
                                             std::shared_ptr<policy_context> &p_context_ptr);
 
-    bool is_valid_policy_name_unlocked(const std::string &policy_name);
+    bool is_valid_policy_name_unlocked(const std::string &policy_name, std::string &hint_message);
 
     policy_factory _factory;
     meta_service *_meta_svc;
diff --git a/src/meta/test/meta_backup_test.cpp b/src/meta/test/meta_backup_test.cpp
index 5a77b13..4d1f214 100644
--- a/src/meta/test/meta_backup_test.cpp
+++ b/src/meta/test/meta_backup_test.cpp
@@ -19,6 +19,7 @@
 #include <map>
 #include <memory>
 #include <string>
+// IWYU pragma: no_include <type_traits>
 #include <utility>
 #include <vector>
 
@@ -204,6 +205,31 @@
     ASSERT_EQ(1, resp.backup_items.size());
 }
 
+TEST_F(backup_service_test, test_valid_policy_name)
+{
+    std::string hint_message;
+    ASSERT_FALSE(_backup_service->is_valid_policy_name_unlocked(cold_backup_constant::BACKUP_INFO,
+                                                                hint_message));
+    ASSERT_EQ("policy name is reserved", hint_message);
+
+    ASSERT_FALSE(_backup_service->is_valid_policy_name_unlocked("bad-policy-name", hint_message));
+    ASSERT_EQ("policy name should match regex '[a-zA-Z_:][a-zA-Z0-9_:]*' when act as a metric name "
+              "in prometheus",
+              hint_message);
+
+    ASSERT_FALSE(_backup_service->is_valid_policy_name_unlocked("bad_policy_name:", hint_message));
+    ASSERT_EQ("policy name should match regex '[a-zA-Z_][a-zA-Z0-9_]*' when act as a metric label "
+              "in prometheus",
+              hint_message);
+
+    _backup_service->_policy_states.insert(std::make_pair("exist_policy_name", nullptr));
+    ASSERT_FALSE(_backup_service->is_valid_policy_name_unlocked("exist_policy_name", hint_message));
+    ASSERT_EQ("policy name is already exist", hint_message);
+
+    ASSERT_TRUE(_backup_service->is_valid_policy_name_unlocked("new_policy_name0", hint_message));
+    ASSERT_TRUE(hint_message.empty());
+}
+
 class backup_engine_test : public meta_test_base
 {
 public: