| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "base/pegasus_rpc_types.h" |
| #include "pegasus_write_service.h" |
| #include "pegasus_write_service_impl.h" |
| #include "capacity_unit_calculator.h" |
| |
| #include <dsn/cpp/message_utils.h> |
| #include <dsn/dist/replication/replication.codes.h> |
| #include <dsn/utility/defer.h> |
| |
| namespace pegasus { |
| namespace server { |
| |
| DEFINE_TASK_CODE(LPC_INGESTION, TASK_PRIORITY_COMMON, THREAD_POOL_INGESTION) |
| |
| pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) |
| : replica_base(server), |
| _server(server), |
| _impl(new impl(server)), |
| _batch_start_time(0), |
| _cu_calculator(server->_cu_calculator.get()) |
| { |
| std::string str_gpid = fmt::format("{}", server->get_gpid()); |
| |
| std::string name; |
| |
| name = fmt::format("put_qps@{}", str_gpid); |
| _pfc_put_qps.init_app_counter( |
| "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of PUT request"); |
| |
| name = fmt::format("multi_put_qps@{}", str_gpid); |
| _pfc_multi_put_qps.init_app_counter( |
| "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of MULTI_PUT request"); |
| |
| name = fmt::format("remove_qps@{}", str_gpid); |
| _pfc_remove_qps.init_app_counter( |
| "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of REMOVE request"); |
| |
| name = fmt::format("multi_remove_qps@{}", str_gpid); |
| _pfc_multi_remove_qps.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_RATE, |
| "statistic the qps of MULTI_REMOVE request"); |
| |
| name = fmt::format("incr_qps@{}", str_gpid); |
| _pfc_incr_qps.init_app_counter( |
| "app.pegasus", name.c_str(), COUNTER_TYPE_RATE, "statistic the qps of INCR request"); |
| |
| name = fmt::format("check_and_set_qps@{}", str_gpid); |
| _pfc_check_and_set_qps.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_RATE, |
| "statistic the qps of CHECK_AND_SET request"); |
| |
| name = fmt::format("check_and_mutate_qps@{}", str_gpid); |
| _pfc_check_and_mutate_qps.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_RATE, |
| "statistic the qps of CHECK_AND_MUTATE request"); |
| |
| name = fmt::format("put_latency@{}", str_gpid); |
| _pfc_put_latency.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_NUMBER_PERCENTILES, |
| "statistic the latency of PUT request"); |
| |
| name = fmt::format("multi_put_latency@{}", str_gpid); |
| _pfc_multi_put_latency.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_NUMBER_PERCENTILES, |
| "statistic the latency of MULTI_PUT request"); |
| |
| name = fmt::format("remove_latency@{}", str_gpid); |
| _pfc_remove_latency.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_NUMBER_PERCENTILES, |
| "statistic the latency of REMOVE request"); |
| |
| name = fmt::format("multi_remove_latency@{}", str_gpid); |
| _pfc_multi_remove_latency.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_NUMBER_PERCENTILES, |
| "statistic the latency of MULTI_REMOVE request"); |
| |
| name = fmt::format("incr_latency@{}", str_gpid); |
| _pfc_incr_latency.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_NUMBER_PERCENTILES, |
| "statistic the latency of INCR request"); |
| |
| name = fmt::format("check_and_set_latency@{}", str_gpid); |
| _pfc_check_and_set_latency.init_app_counter("app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_NUMBER_PERCENTILES, |
| "statistic the latency of CHECK_AND_SET request"); |
| |
| name = fmt::format("check_and_mutate_latency@{}", str_gpid); |
| _pfc_check_and_mutate_latency.init_app_counter( |
| "app.pegasus", |
| name.c_str(), |
| COUNTER_TYPE_NUMBER_PERCENTILES, |
| "statistic the latency of CHECK_AND_MUTATE request"); |
| |
| _pfc_duplicate_qps.init_app_counter("app.pegasus", |
| fmt::format("duplicate_qps@{}", str_gpid).c_str(), |
| COUNTER_TYPE_RATE, |
| "statistic the qps of DUPLICATE requests"); |
| |
| _pfc_dup_time_lag.init_app_counter( |
| "app.pegasus", |
| fmt::format("dup.time_lag_ms@{}", app_name()).c_str(), |
| COUNTER_TYPE_NUMBER_PERCENTILES, |
| "the time (in ms) lag between master and slave in the duplication"); |
| |
| _dup_lagging_write_threshold_ms = dsn_config_get_value_int64( |
| "pegasus.server", |
| "dup_lagging_write_threshold_ms", |
| 10 * 1000, |
| "If the duration that a write flows from master to slave is larger than this threshold, " |
| "the write is defined a lagging write."); |
| _pfc_dup_lagging_writes.init_app_counter( |
| "app.pegasus", |
| fmt::format("dup.lagging_writes@{}", app_name()).c_str(), |
| COUNTER_TYPE_VOLATILE_NUMBER, |
| "the number of lagging writes (time lag larger than `dup_lagging_write_threshold_ms`)"); |
| } |
| |
| pegasus_write_service::~pegasus_write_service() {} |
| |
| int pegasus_write_service::empty_put(int64_t decree) { return _impl->empty_put(decree); } |
| |
| int pegasus_write_service::multi_put(const db_write_context &ctx, |
| const dsn::apps::multi_put_request &update, |
| dsn::apps::update_response &resp) |
| { |
| uint64_t start_time = dsn_now_ns(); |
| _pfc_multi_put_qps->increment(); |
| int err = _impl->multi_put(ctx, update, resp); |
| |
| if (_server->is_primary()) { |
| _cu_calculator->add_multi_put_cu(resp.error, update.hash_key, update.kvs); |
| } |
| |
| _pfc_multi_put_latency->set(dsn_now_ns() - start_time); |
| return err; |
| } |
| |
| int pegasus_write_service::multi_remove(int64_t decree, |
| const dsn::apps::multi_remove_request &update, |
| dsn::apps::multi_remove_response &resp) |
| { |
| uint64_t start_time = dsn_now_ns(); |
| _pfc_multi_remove_qps->increment(); |
| int err = _impl->multi_remove(decree, update, resp); |
| |
| if (_server->is_primary()) { |
| _cu_calculator->add_multi_remove_cu(resp.error, update.hash_key, update.sort_keys); |
| } |
| |
| _pfc_multi_remove_latency->set(dsn_now_ns() - start_time); |
| return err; |
| } |
| |
| int pegasus_write_service::incr(int64_t decree, |
| const dsn::apps::incr_request &update, |
| dsn::apps::incr_response &resp) |
| { |
| uint64_t start_time = dsn_now_ns(); |
| _pfc_incr_qps->increment(); |
| int err = _impl->incr(decree, update, resp); |
| |
| if (_server->is_primary()) { |
| _cu_calculator->add_incr_cu(resp.error, update.key); |
| } |
| |
| _pfc_incr_latency->set(dsn_now_ns() - start_time); |
| return err; |
| } |
| |
| int pegasus_write_service::check_and_set(int64_t decree, |
| const dsn::apps::check_and_set_request &update, |
| dsn::apps::check_and_set_response &resp) |
| { |
| uint64_t start_time = dsn_now_ns(); |
| _pfc_check_and_set_qps->increment(); |
| int err = _impl->check_and_set(decree, update, resp); |
| |
| if (_server->is_primary()) { |
| _cu_calculator->add_check_and_set_cu(resp.error, |
| update.hash_key, |
| update.check_sort_key, |
| update.set_sort_key, |
| update.set_value); |
| } |
| |
| _pfc_check_and_set_latency->set(dsn_now_ns() - start_time); |
| return err; |
| } |
| |
| int pegasus_write_service::check_and_mutate(int64_t decree, |
| const dsn::apps::check_and_mutate_request &update, |
| dsn::apps::check_and_mutate_response &resp) |
| { |
| uint64_t start_time = dsn_now_ns(); |
| _pfc_check_and_mutate_qps->increment(); |
| int err = _impl->check_and_mutate(decree, update, resp); |
| |
| if (_server->is_primary()) { |
| _cu_calculator->add_check_and_mutate_cu( |
| resp.error, update.hash_key, update.check_sort_key, update.mutate_list); |
| } |
| |
| _pfc_check_and_mutate_latency->set(dsn_now_ns() - start_time); |
| return err; |
| } |
| |
| void pegasus_write_service::batch_prepare(int64_t decree) |
| { |
| dassert(_batch_start_time == 0, |
| "batch_prepare and batch_commit/batch_abort must be called in pair"); |
| |
| _batch_start_time = dsn_now_ns(); |
| } |
| |
| int pegasus_write_service::batch_put(const db_write_context &ctx, |
| const dsn::apps::update_request &update, |
| dsn::apps::update_response &resp) |
| { |
| dassert(_batch_start_time != 0, "batch_put must be called after batch_prepare"); |
| |
| _batch_qps_perfcounters.push_back(_pfc_put_qps.get()); |
| _batch_latency_perfcounters.push_back(_pfc_put_latency.get()); |
| int err = _impl->batch_put(ctx, update, resp); |
| |
| if (_server->is_primary()) { |
| _cu_calculator->add_put_cu(resp.error, update.key, update.value); |
| } |
| |
| return err; |
| } |
| |
| int pegasus_write_service::batch_remove(int64_t decree, |
| const dsn::blob &key, |
| dsn::apps::update_response &resp) |
| { |
| dassert(_batch_start_time != 0, "batch_remove must be called after batch_prepare"); |
| |
| _batch_qps_perfcounters.push_back(_pfc_remove_qps.get()); |
| _batch_latency_perfcounters.push_back(_pfc_remove_latency.get()); |
| int err = _impl->batch_remove(decree, key, resp); |
| |
| if (_server->is_primary()) { |
| _cu_calculator->add_remove_cu(resp.error, key); |
| } |
| |
| return err; |
| } |
| |
| int pegasus_write_service::batch_commit(int64_t decree) |
| { |
| dassert(_batch_start_time != 0, "batch_commit must be called after batch_prepare"); |
| |
| int err = _impl->batch_commit(decree); |
| clear_up_batch_states(); |
| return err; |
| } |
| |
| void pegasus_write_service::batch_abort(int64_t decree, int err) |
| { |
| dassert(_batch_start_time != 0, "batch_abort must be called after batch_prepare"); |
| dassert(err, "must abort on non-zero err"); |
| |
| _impl->batch_abort(decree, err); |
| clear_up_batch_states(); |
| } |
| |
| void pegasus_write_service::set_default_ttl(uint32_t ttl) { _impl->set_default_ttl(ttl); } |
| |
| void pegasus_write_service::clear_up_batch_states() |
| { |
| uint64_t latency = dsn_now_ns() - _batch_start_time; |
| for (dsn::perf_counter *pfc : _batch_qps_perfcounters) |
| pfc->increment(); |
| for (dsn::perf_counter *pfc : _batch_latency_perfcounters) |
| pfc->set(latency); |
| |
| _batch_qps_perfcounters.clear(); |
| _batch_latency_perfcounters.clear(); |
| _batch_start_time = 0; |
| } |
| |
| int pegasus_write_service::duplicate(int64_t decree, |
| const dsn::apps::duplicate_request &request, |
| dsn::apps::duplicate_response &resp) |
| { |
| // Verifies the cluster_id. |
| if (!dsn::replication::is_cluster_id_configured(request.cluster_id)) { |
| resp.__set_error(rocksdb::Status::kInvalidArgument); |
| resp.__set_error_hint("request cluster id is unconfigured"); |
| return empty_put(decree); |
| } |
| if (request.cluster_id == get_current_cluster_id()) { |
| resp.__set_error(rocksdb::Status::kInvalidArgument); |
| resp.__set_error_hint("self-duplicating"); |
| return empty_put(decree); |
| } |
| |
| _pfc_duplicate_qps->increment(); |
| auto cleanup = dsn::defer([this, &request]() { |
| uint64_t latency_ms = (dsn_now_us() - request.timestamp) / 1000; |
| if (latency_ms > _dup_lagging_write_threshold_ms) { |
| _pfc_dup_lagging_writes->increment(); |
| } |
| _pfc_dup_time_lag->set(latency_ms); |
| }); |
| dsn::message_ex *write = dsn::from_blob_to_received_msg(request.task_code, request.raw_message); |
| bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE || |
| request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE; |
| auto remote_timetag = generate_timetag(request.timestamp, request.cluster_id, is_delete); |
| auto ctx = db_write_context::create_duplicate(decree, remote_timetag, request.verify_timetag); |
| |
| if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) { |
| multi_put_rpc rpc(write); |
| resp.__set_error(_impl->multi_put(ctx, rpc.request(), rpc.response())); |
| return resp.error; |
| } |
| if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) { |
| multi_remove_rpc rpc(write); |
| resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(), rpc.response())); |
| return resp.error; |
| } |
| put_rpc put; |
| remove_rpc remove; |
| if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT || |
| request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { |
| int err = 0; |
| if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) { |
| put = put_rpc(write); |
| err = _impl->batch_put(ctx, put.request(), put.response()); |
| } |
| if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) { |
| remove = remove_rpc(write); |
| err = _impl->batch_remove(ctx.decree, remove.request(), remove.response()); |
| } |
| if (!err) { |
| err = _impl->batch_commit(ctx.decree); |
| } else { |
| _impl->batch_abort(ctx.decree, err); |
| } |
| resp.__set_error(err); |
| return resp.error; |
| } |
| resp.__set_error(rocksdb::Status::kInvalidArgument); |
| resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code)); |
| return empty_put(ctx.decree); |
| } |
| |
| int pegasus_write_service::ingestion_files(int64_t decree, |
| const dsn::replication::ingestion_request &req, |
| dsn::replication::ingestion_response &resp) |
| { |
| // TODO(heyuchen): consider cu |
| |
| resp.err = dsn::ERR_OK; |
| // write empty put to flush decree |
| resp.rocksdb_error = empty_put(decree); |
| if (resp.rocksdb_error != 0) { |
| resp.err = dsn::ERR_TRY_AGAIN; |
| return resp.rocksdb_error; |
| } |
| |
| // ingest files asynchronously |
| _server->set_ingestion_status(dsn::replication::ingestion_status::IS_RUNNING); |
| dsn::tasking::enqueue(LPC_INGESTION, &_server->_tracker, [this, decree, req]() { |
| dsn::error_code err = |
| _impl->ingestion_files(decree, _server->bulk_load_dir(), req.metadata); |
| if (err == dsn::ERR_OK) { |
| _server->set_ingestion_status(dsn::replication::ingestion_status::IS_SUCCEED); |
| } else { |
| _server->set_ingestion_status(dsn::replication::ingestion_status::IS_FAILED); |
| } |
| }); |
| return rocksdb::Status::kOk; |
| } |
| |
| } // namespace server |
| } // namespace pegasus |