blob: 4ae299bb818473fd08b398d95cd027eb7d861aad [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <fmt/core.h>
#include <rapidjson/ostreamwrapper.h>
#include <algorithm>
#include <map>
#include <memory>
#include <set>
#include <sstream>
#include <string>
#include <unordered_map>
#include <utility>
#include "backup_types.h"
#include "bulk_load_types.h"
#include "common//duplication_common.h"
#include "common/bulk_load_common.h"
#include "common/gpid.h"
#include "common/replica_envs.h"
#include "common/replication.codes.h"
#include "common/replication_common.h"
#include "common/replication_enums.h"
#include "common/serialization_helper/dsn.layer2_types.h"
#include "duplication_types.h"
#include "http/http_status_code.h"
#include "meta/duplication/meta_duplication_service.h"
#include "meta/meta_backup_service.h"
#include "meta/meta_bulk_load_service.h"
#include "meta/meta_rpc_types.h"
#include "meta/meta_service.h"
#include "meta_admin_types.h"
#include "meta_http_service.h"
#include "meta_server_failure_detector.h"
#include "runtime/api_layer1.h"
#include "runtime/rpc/rpc_host_port.h"
#include "server_load_balancer.h"
#include "server_state.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/output_utils.h"
#include "utils/time_utils.h"
DSN_DECLARE_string(hosts_list);
namespace dsn {
namespace replication {
struct list_nodes_helper
{
std::string node_address;
std::string node_status;
int primary_count;
int secondary_count;
list_nodes_helper(const std::string &a, const std::string &s)
: node_address(a), node_status(s), primary_count(0), secondary_count(0)
{
}
};
void meta_http_service::get_app_handler(const http_request &req, http_response &resp)
{
std::string app_name;
bool detailed = false;
for (const auto &p : req.query_args) {
if (p.first == "name") {
app_name = p.second;
} else if (p.first == "detail") {
detailed = true;
} else {
resp.status_code = http_status_code::kBadRequest;
return;
}
}
if (!redirect_if_not_primary(req, resp))
return;
if (app_name.empty()) {
resp.status_code = http_status_code::kBadRequest;
resp.body = "app name shouldn't be empty";
return;
}
query_cfg_request request;
query_cfg_response response;
request.app_name = app_name;
_service->_state->query_configuration_by_index(request, response);
if (response.err == ERR_OBJECT_NOT_FOUND) {
resp.status_code = http_status_code::kNotFound;
resp.body = fmt::format("table not found: \"{}\"", app_name);
return;
}
if (response.err != dsn::ERR_OK) {
resp.body = response.err;
resp.status_code = http_status_code::kInternalServerError;
return;
}
// output as json format
dsn::utils::multi_table_printer mtp;
std::ostringstream out;
dsn::utils::table_printer tp_general("general");
tp_general.add_row_name_and_data("app_name", app_name);
tp_general.add_row_name_and_data("app_id", response.app_id);
tp_general.add_row_name_and_data("partition_count", response.partition_count);
if (!response.partitions.empty()) {
tp_general.add_row_name_and_data("max_replica_count",
response.partitions[0].max_replica_count);
} else {
tp_general.add_row_name_and_data("max_replica_count", 0);
}
mtp.add(std::move(tp_general));
if (detailed) {
dsn::utils::table_printer tp_details("replicas");
tp_details.add_title("pidx");
tp_details.add_column("ballot");
tp_details.add_column("replica_count");
tp_details.add_column("primary");
tp_details.add_column("secondaries");
std::map<host_port, std::pair<int, int>> node_stat;
int total_prim_count = 0;
int total_sec_count = 0;
int fully_healthy = 0;
int write_unhealthy = 0;
int read_unhealthy = 0;
for (const auto &p : response.partitions) {
int replica_count = 0;
if (!p.hp_primary.is_invalid()) {
replica_count++;
node_stat[p.hp_primary].first++;
total_prim_count++;
}
replica_count += p.hp_secondaries.size();
total_sec_count += p.hp_secondaries.size();
if (!p.hp_primary.is_invalid()) {
if (replica_count >= p.max_replica_count)
fully_healthy++;
else if (replica_count < 2)
write_unhealthy++;
} else {
write_unhealthy++;
read_unhealthy++;
}
tp_details.add_row(p.pid.get_partition_index());
tp_details.append_data(p.ballot);
std::stringstream oss;
oss << replica_count << "/" << p.max_replica_count;
tp_details.append_data(oss.str());
tp_details.append_data((p.hp_primary.is_invalid() ? "-" : p.hp_primary.to_string()));
oss.str("");
oss << "[";
for (int j = 0; j < p.hp_secondaries.size(); j++) {
if (j != 0)
oss << ",";
oss << p.hp_secondaries[j];
node_stat[p.hp_secondaries[j]].second++;
}
oss << "]";
tp_details.append_data(oss.str());
}
mtp.add(std::move(tp_details));
// 'node' section.
dsn::utils::table_printer tp_nodes("nodes");
tp_nodes.add_title("node");
tp_nodes.add_column("primary");
tp_nodes.add_column("secondary");
tp_nodes.add_column("total");
for (auto &kv : node_stat) {
tp_nodes.add_row(kv.first.to_string());
tp_nodes.append_data(kv.second.first);
tp_nodes.append_data(kv.second.second);
tp_nodes.append_data(kv.second.first + kv.second.second);
}
tp_nodes.add_row("total");
tp_nodes.append_data(total_prim_count);
tp_nodes.append_data(total_sec_count);
tp_nodes.append_data(total_prim_count + total_sec_count);
mtp.add(std::move(tp_nodes));
// healthy partition count section.
dsn::utils::table_printer tp_hpc("healthy");
tp_hpc.add_row_name_and_data("fully_healthy_partition_count", fully_healthy);
tp_hpc.add_row_name_and_data("unhealthy_partition_count",
response.partition_count - fully_healthy);
tp_hpc.add_row_name_and_data("write_unhealthy_partition_count", write_unhealthy);
tp_hpc.add_row_name_and_data("read_unhealthy_partition_count", read_unhealthy);
mtp.add(std::move(tp_hpc));
}
mtp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
void meta_http_service::list_app_handler(const http_request &req, http_response &resp)
{
bool detailed = false;
for (const auto &p : req.query_args) {
if (p.first == "detail") {
detailed = true;
} else {
resp.status_code = http_status_code::kBadRequest;
return;
}
}
if (!redirect_if_not_primary(req, resp))
return;
configuration_list_apps_response response;
configuration_list_apps_request request;
request.status = dsn::app_status::AS_INVALID;
_service->_state->list_apps(request, response);
if (response.err != dsn::ERR_OK) {
resp.body = response.err;
resp.status_code = http_status_code::kInternalServerError;
return;
}
std::vector<::dsn::app_info> &apps = response.infos;
// output as json format
std::ostringstream out;
dsn::utils::multi_table_printer mtp;
int available_app_count = 0;
dsn::utils::table_printer tp_general("general_info");
tp_general.add_title("app_id");
tp_general.add_column("status");
tp_general.add_column("app_name");
tp_general.add_column("app_type");
tp_general.add_column("partition_count");
tp_general.add_column("replica_count");
tp_general.add_column("is_stateful");
tp_general.add_column("create_time");
tp_general.add_column("drop_time");
tp_general.add_column("drop_expire");
tp_general.add_column("envs_count");
for (const auto &app : apps) {
if (app.status != dsn::app_status::AS_AVAILABLE) {
continue;
}
std::string status_str = enum_to_string(app.status);
status_str = status_str.substr(status_str.find("AS_") + 3);
std::string create_time = "-";
if (app.create_second > 0) {
char buf[24] = {0};
dsn::utils::time_ms_to_string((uint64_t)app.create_second * 1000, buf);
create_time = buf;
}
std::string drop_time = "-";
std::string drop_expire_time = "-";
if (app.status == app_status::AS_AVAILABLE) {
available_app_count++;
} else if (app.status == app_status::AS_DROPPED && app.expire_second > 0) {
if (app.drop_second > 0) {
char buf[24] = {0};
dsn::utils::time_ms_to_string((uint64_t)app.drop_second * 1000, buf);
drop_time = buf;
}
if (app.expire_second > 0) {
char buf[24] = {0};
dsn::utils::time_ms_to_string((uint64_t)app.expire_second * 1000, buf);
drop_expire_time = buf;
}
}
tp_general.add_row(app.app_id);
tp_general.append_data(status_str);
tp_general.append_data(app.app_name);
tp_general.append_data(app.app_type);
tp_general.append_data(app.partition_count);
tp_general.append_data(app.max_replica_count);
tp_general.append_data(app.is_stateful);
tp_general.append_data(create_time);
tp_general.append_data(drop_time);
tp_general.append_data(drop_expire_time);
tp_general.append_data(app.envs.size());
}
mtp.add(std::move(tp_general));
int total_fully_healthy_app_count = 0;
int total_unhealthy_app_count = 0;
int total_write_unhealthy_app_count = 0;
int total_read_unhealthy_app_count = 0;
if (detailed && available_app_count > 0) {
dsn::utils::table_printer tp_health("healthy_info");
tp_health.add_title("app_id");
tp_health.add_column("app_name");
tp_health.add_column("partition_count");
tp_health.add_column("fully_healthy");
tp_health.add_column("unhealthy");
tp_health.add_column("write_unhealthy");
tp_health.add_column("read_unhealthy");
for (auto &info : apps) {
if (info.status != app_status::AS_AVAILABLE) {
continue;
}
query_cfg_request request;
query_cfg_response response;
request.app_name = info.app_name;
_service->_state->query_configuration_by_index(request, response);
CHECK_EQ(info.app_id, response.app_id);
CHECK_EQ(info.partition_count, response.partition_count);
int fully_healthy = 0;
int write_unhealthy = 0;
int read_unhealthy = 0;
for (int i = 0; i < response.partitions.size(); i++) {
const dsn::partition_configuration &p = response.partitions[i];
int replica_count = 0;
if (!p.hp_primary.is_invalid()) {
replica_count++;
}
replica_count += p.hp_secondaries.size();
if (!p.hp_primary.is_invalid()) {
if (replica_count >= p.max_replica_count)
fully_healthy++;
else if (replica_count < 2)
write_unhealthy++;
} else {
write_unhealthy++;
read_unhealthy++;
}
}
tp_health.add_row(info.app_id);
tp_health.append_data(info.app_name);
tp_health.append_data(info.partition_count);
tp_health.append_data(fully_healthy);
tp_health.append_data(info.partition_count - fully_healthy);
tp_health.append_data(write_unhealthy);
tp_health.append_data(read_unhealthy);
if (fully_healthy == info.partition_count)
total_fully_healthy_app_count++;
else
total_unhealthy_app_count++;
if (write_unhealthy > 0)
total_write_unhealthy_app_count++;
if (read_unhealthy > 0)
total_read_unhealthy_app_count++;
}
mtp.add(std::move(tp_health));
}
dsn::utils::table_printer tp_count("summary");
tp_count.add_row_name_and_data("total_app_count", available_app_count);
if (detailed && available_app_count > 0) {
tp_count.add_row_name_and_data("fully_healthy_app_count", total_fully_healthy_app_count);
tp_count.add_row_name_and_data("unhealthy_app_count", total_unhealthy_app_count);
tp_count.add_row_name_and_data("write_unhealthy_app_count",
total_write_unhealthy_app_count);
tp_count.add_row_name_and_data("read_unhealthy_app_count", total_read_unhealthy_app_count);
}
mtp.add(std::move(tp_count));
mtp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
void meta_http_service::list_node_handler(const http_request &req, http_response &resp)
{
bool detailed = false;
for (const auto &p : req.query_args) {
if (p.first == "detail") {
detailed = true;
} else {
resp.status_code = http_status_code::kBadRequest;
return;
}
}
if (!redirect_if_not_primary(req, resp))
return;
std::map<dsn::host_port, list_nodes_helper> tmp_map;
for (const auto &node : _service->_alive_set) {
tmp_map.emplace(node, list_nodes_helper(node.to_string(), "ALIVE"));
}
for (const auto &node : _service->_dead_set) {
tmp_map.emplace(node, list_nodes_helper(node.to_string(), "UNALIVE"));
}
int alive_node_count = (_service->_alive_set).size();
int unalive_node_count = (_service->_dead_set).size();
if (detailed) {
configuration_list_apps_response response;
configuration_list_apps_request request;
request.status = dsn::app_status::AS_AVAILABLE;
_service->_state->list_apps(request, response);
for (const auto &app : response.infos) {
query_cfg_request request_app;
query_cfg_response response_app;
request_app.app_name = app.app_name;
_service->_state->query_configuration_by_index(request_app, response_app);
CHECK_EQ(app.app_id, response_app.app_id);
CHECK_EQ(app.partition_count, response_app.partition_count);
for (int i = 0; i < response_app.partitions.size(); i++) {
const dsn::partition_configuration &p = response_app.partitions[i];
if (!p.hp_primary.is_invalid()) {
auto find = tmp_map.find(p.hp_primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (int j = 0; j < p.hp_secondaries.size(); j++) {
auto find = tmp_map.find(p.hp_secondaries[j]);
if (find != tmp_map.end()) {
find->second.secondary_count++;
}
}
}
}
}
// output as json format
std::ostringstream out;
dsn::utils::multi_table_printer mtp;
dsn::utils::table_printer tp_details("details");
tp_details.add_title("address");
tp_details.add_column("status");
if (detailed) {
tp_details.add_column("replica_count");
tp_details.add_column("primary_count");
tp_details.add_column("secondary_count");
}
for (const auto &kv : tmp_map) {
tp_details.add_row(kv.second.node_address);
tp_details.append_data(kv.second.node_status);
if (detailed) {
tp_details.append_data(kv.second.primary_count + kv.second.secondary_count);
tp_details.append_data(kv.second.primary_count);
tp_details.append_data(kv.second.secondary_count);
}
}
mtp.add(std::move(tp_details));
dsn::utils::table_printer tp_count("summary");
tp_count.add_row_name_and_data("total_node_count", alive_node_count + unalive_node_count);
tp_count.add_row_name_and_data("alive_node_count", alive_node_count);
tp_count.add_row_name_and_data("unalive_node_count", unalive_node_count);
mtp.add(std::move(tp_count));
mtp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
void meta_http_service::get_cluster_info_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}
dsn::utils::table_printer tp;
std::ostringstream out;
std::string meta_servers_str;
int ms_size = _service->_opts.meta_servers.size();
for (int i = 0; i < ms_size; i++) {
meta_servers_str += _service->_opts.meta_servers[i].to_string();
if (i != ms_size - 1) {
meta_servers_str += ",";
}
}
tp.add_row_name_and_data("meta_servers", meta_servers_str);
tp.add_row_name_and_data("primary_meta_server", dsn_primary_host_port().to_string());
tp.add_row_name_and_data("zookeeper_hosts", FLAGS_hosts_list);
tp.add_row_name_and_data("zookeeper_root", _service->_cluster_root);
tp.add_row_name_and_data(
"meta_function_level",
_meta_function_level_VALUES_TO_NAMES.find(_service->get_function_level())->second + 3);
std::vector<std::string> balance_operation_type;
balance_operation_type.emplace_back("detail");
tp.add_row_name_and_data(
"balance_operation_count",
_service->_balancer->get_balance_operation_count(balance_operation_type));
double primary_stddev, total_stddev;
_service->_state->get_cluster_balance_score(primary_stddev, total_stddev);
tp.add_row_name_and_data("primary_replica_count_stddev", primary_stddev);
tp.add_row_name_and_data("total_replica_count_stddev", total_stddev);
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
void meta_http_service::get_app_envs_handler(const http_request &req, http_response &resp)
{
// only primary process the request
if (!redirect_if_not_primary(req, resp))
return;
std::string app_name;
for (const auto &p : req.query_args) {
if ("name" == p.first) {
app_name = p.second;
break;
}
}
if (app_name.empty()) {
resp.status_code = http_status_code::kBadRequest;
resp.body = "app name shouldn't be empty";
return;
}
// get all of the apps
configuration_list_apps_response response;
configuration_list_apps_request request;
request.status = dsn::app_status::AS_AVAILABLE;
_service->_state->list_apps(request, response);
if (response.err != dsn::ERR_OK) {
resp.body = response.err.to_string();
resp.status_code = http_status_code::kInternalServerError;
return;
}
// using app envs to generate a table_printer
dsn::utils::table_printer tp;
for (auto &app : response.infos) {
if (app.app_name == app_name) {
for (auto env : app.envs) {
tp.add_row_name_and_data(env.first, env.second);
}
break;
}
}
// output as json format
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
std::string set_to_string(const std::set<int32_t> &s)
{
std::stringstream out;
rapidjson::OStreamWrapper wrapper(out);
dsn::json::JsonWriter writer(wrapper);
dsn::json::json_encode(writer, s);
return out.str();
}
void meta_http_service::query_backup_policy_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp))
return;
if (_service->_backup_handler == nullptr) {
resp.body = "cold_backup_disabled";
resp.status_code = http_status_code::kNotFound;
return;
}
auto request = std::make_unique<configuration_query_backup_policy_request>();
std::vector<std::string> policy_names;
for (const auto &p : req.query_args) {
if (p.first == "name") {
policy_names.push_back(p.second);
} else {
resp.body = "Invalid parameter";
resp.status_code = http_status_code::kBadRequest;
return;
}
}
request->policy_names = std::move(policy_names);
query_backup_policy_rpc http_to_rpc(std::move(request), LPC_DEFAULT_CALLBACK);
_service->_backup_handler->query_backup_policy(http_to_rpc);
auto rpc_return = http_to_rpc.response();
dsn::utils::table_printer tp_query_backup_policy;
tp_query_backup_policy.add_title("name");
tp_query_backup_policy.add_column("backup_provider_type");
tp_query_backup_policy.add_column("backup_interval");
tp_query_backup_policy.add_column("app_ids");
tp_query_backup_policy.add_column("start_time");
tp_query_backup_policy.add_column("status");
tp_query_backup_policy.add_column("backup_history_count");
for (const auto &cur_policy : rpc_return.policys) {
tp_query_backup_policy.add_row(cur_policy.policy_name);
tp_query_backup_policy.append_data(cur_policy.backup_provider_type);
tp_query_backup_policy.append_data(cur_policy.backup_interval_seconds);
tp_query_backup_policy.append_data(set_to_string(cur_policy.app_ids));
tp_query_backup_policy.append_data(cur_policy.start_time);
tp_query_backup_policy.append_data(cur_policy.is_disable ? "disabled" : "enabled");
tp_query_backup_policy.append_data(cur_policy.backup_history_count_to_keep);
}
std::ostringstream out;
tp_query_backup_policy.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
void meta_http_service::query_duplication_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}
if (_service->_dup_svc == nullptr) {
resp.body = "duplication is not enabled [FLAGS_duplication_enabled=false]";
resp.status_code = http_status_code::kNotFound;
return;
}
duplication_query_request rpc_req;
auto it = req.query_args.find("name");
if (it == req.query_args.end()) {
resp.body = "name should not be empty";
resp.status_code = http_status_code::kBadRequest;
return;
}
rpc_req.app_name = it->second;
duplication_query_response rpc_resp;
_service->_dup_svc->query_duplication_info(rpc_req, rpc_resp);
if (rpc_resp.err != ERR_OK) {
resp.body = rpc_resp.err.to_string();
if (rpc_resp.err == ERR_APP_NOT_EXIST) {
resp.status_code = http_status_code::kNotFound;
} else {
resp.status_code = http_status_code::kInternalServerError;
}
return;
}
resp.status_code = http_status_code::kOk;
resp.body = duplication_query_response_to_string(rpc_resp);
}
void meta_http_service::start_bulk_load_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}
if (_service->_bulk_load_svc == nullptr) {
resp.body = "bulk load is not enabled";
resp.status_code = http_status_code::kNotFound;
return;
}
start_bulk_load_request request;
bool ret = json::json_forwarder<start_bulk_load_request>::decode(req.body, request);
if (!ret) {
resp.body = "invalid request structure";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (request.app_name.empty()) {
resp.body = "app_name should not be empty";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (request.cluster_name.empty()) {
resp.body = "cluster_name should not be empty";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (request.file_provider_type.empty()) {
resp.body = "file_provider_type should not be empty";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (request.remote_root_path.empty()) {
resp.body = "remote_root_path should not be empty";
resp.status_code = http_status_code::kBadRequest;
return;
}
// TODO(yingchun): Also deal the 'ingest_behind' parameter.
auto rpc_req = std::make_unique<start_bulk_load_request>(request);
start_bulk_load_rpc rpc(std::move(rpc_req), LPC_META_CALLBACK);
_service->_bulk_load_svc->on_start_bulk_load(rpc);
auto rpc_resp = rpc.response();
// output as json format
dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", rpc_resp.err.to_string());
tp.add_row_name_and_data("hint_msg", rpc_resp.hint_msg);
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
void meta_http_service::query_bulk_load_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}
if (_service->_bulk_load_svc == nullptr) {
resp.body = "bulk load is not enabled";
resp.status_code = http_status_code::kNotFound;
return;
}
auto it = req.query_args.find("name");
if (it == req.query_args.end()) {
resp.body = "name should not be empty";
resp.status_code = http_status_code::kBadRequest;
return;
}
auto rpc_req = std::make_unique<query_bulk_load_request>();
rpc_req->app_name = it->second;
query_bulk_load_rpc rpc(std::move(rpc_req), LPC_META_CALLBACK);
_service->_bulk_load_svc->on_query_bulk_load_status(rpc);
auto rpc_resp = rpc.response();
// output as json format
dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", rpc_resp.err.to_string());
tp.add_row_name_and_data("app_status", dsn::enum_to_string(rpc_resp.app_status));
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
void meta_http_service::start_compaction_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}
// validate parameters
manual_compaction_info info;
bool ret = json::json_forwarder<manual_compaction_info>::decode(req.body, info);
if (!ret) {
resp.body = "invalid request structure";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (info.app_name.empty()) {
resp.body = "app_name should not be empty";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (info.type.empty() || (info.type != "once" && info.type != "periodic")) {
resp.body = "type should ony be 'once' or 'periodic'";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (info.target_level < -1) {
resp.body = "target_level should be >= -1";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (info.bottommost_level_compaction.empty() || (info.bottommost_level_compaction != "skip" &&
info.bottommost_level_compaction != "force")) {
resp.body = "bottommost_level_compaction should ony be 'skip' or 'force'";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (info.max_concurrent_running_count < 0) {
resp.body = "max_running_count should be >= 0";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (info.type == "periodic" && info.trigger_time.empty()) {
resp.body = "trigger_time should not be empty when type is periodic";
resp.status_code = http_status_code::kBadRequest;
return;
}
// create configuration_update_app_env_request
std::vector<std::string> keys;
std::vector<std::string> values;
if (info.type == "once") {
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TARGET_LEVEL);
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_BOTTOMMOST_LEVEL_COMPACTION);
keys.emplace_back(replica_envs::MANUAL_COMPACT_ONCE_TRIGGER_TIME);
} else {
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TARGET_LEVEL);
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_BOTTOMMOST_LEVEL_COMPACTION);
keys.emplace_back(replica_envs::MANUAL_COMPACT_PERIODIC_TRIGGER_TIME);
}
values.emplace_back(std::to_string(info.target_level));
values.emplace_back(info.bottommost_level_compaction);
values.emplace_back(info.type == "once" ? std::to_string(dsn_now_s()) : info.trigger_time);
if (info.max_concurrent_running_count > 0) {
keys.emplace_back(replica_envs::MANUAL_COMPACT_MAX_CONCURRENT_RUNNING_COUNT);
values.emplace_back(std::to_string(info.max_concurrent_running_count));
}
update_app_env(info.app_name, keys, values, resp);
}
void meta_http_service::update_scenario_handler(const http_request &req, http_response &resp)
{
if (!redirect_if_not_primary(req, resp)) {
return;
}
// validate parameters
usage_scenario_info info;
bool ret = json::json_forwarder<usage_scenario_info>::decode(req.body, info);
if (!ret) {
resp.body = "invalid request structure";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (info.app_name.empty()) {
resp.body = "app_name should not be empty";
resp.status_code = http_status_code::kBadRequest;
return;
}
if (info.scenario.empty() || (info.scenario != "bulk_load" && info.scenario != "normal")) {
resp.body = "scenario should ony be 'normal' or 'bulk_load'";
resp.status_code = http_status_code::kBadRequest;
return;
}
// create configuration_update_app_env_request
std::vector<std::string> keys;
std::vector<std::string> values;
keys.emplace_back(replica_envs::ROCKSDB_USAGE_SCENARIO);
values.emplace_back(info.scenario);
update_app_env(info.app_name, keys, values, resp);
}
bool meta_http_service::redirect_if_not_primary(const http_request &req, http_response &resp)
{
#ifdef MOCK_TEST
// To enable MOCK_TEST, the option BUILD_TEST for cmake should be opened by:
// cmake -DBUILD_TEST=ON ...
// which could be done by building Pegasus with tests by:
// ./run.sh build --test ...
//
// If `_service->_balancer` is not null, it must has been initialized by mocking, in which case
// just returning true is ok.
//
// Otherwise, once `_service->_balancer` is null, which means this must be a standby meta
// server, returning true would lead to coredump due to null `_service->_balancer` while
// processing requests in `get_cluster_info_handler`. Thus it should go through the following
// normal process instead of just returning true.
if (_service->_balancer) {
return true;
}
#endif
host_port leader;
if (_service->_failure_detector->get_leader(&leader)) {
return true;
}
// set redirect response
resp.location = fmt::format("http://{}/{}", leader, req.path);
if (!req.query_args.empty()) {
resp.location += '?';
for (const auto &i : req.query_args) {
resp.location += i.first + '=' + i.second + '&';
}
resp.location.pop_back(); // remove final '&'
}
resp.location.erase(std::remove(resp.location.begin(), resp.location.end(), '\0'),
resp.location.end()); // remove final '\0'
resp.status_code = http_status_code::kTemporaryRedirect;
return false;
}
void meta_http_service::update_app_env(const std::string &app_name,
const std::vector<std::string> &keys,
const std::vector<std::string> &values,
http_response &resp)
{
configuration_update_app_env_request request;
request.app_name = app_name;
request.op = app_env_operation::APP_ENV_OP_SET;
request.__set_keys(keys);
request.__set_values(values);
auto rpc_req = std::make_unique<configuration_update_app_env_request>(request);
update_app_env_rpc rpc(std::move(rpc_req), LPC_META_STATE_NORMAL);
_service->_state->set_app_envs(rpc);
auto rpc_resp = rpc.response();
// output as json format
dsn::utils::table_printer tp;
tp.add_row_name_and_data("error", rpc_resp.err.to_string());
tp.add_row_name_and_data("hint_message", rpc_resp.hint_message);
std::ostringstream out;
tp.output(out, dsn::utils::table_printer::output_format::kJsonCompact);
resp.body = out.str();
resp.status_code = http_status_code::kOk;
}
} // namespace replication
} // namespace dsn