blob: d688ab6e4533c024db155cd656248917d468b1b7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <absl/strings/string_view.h>
#include <fmt/core.h>
#include <rocksdb/status.h>
#include <stddef.h>
#include <functional>
#include <set>
#include <vector>
#include "base/pegasus_rpc_types.h"
#include "bulk_load_types.h"
#include "capacity_unit_calculator.h"
#include "common/duplication_common.h"
#include "common/replication.codes.h"
#include "duplication_internal_types.h"
#include "pegasus_value_schema.h"
#include "pegasus_write_service.h"
#include "pegasus_write_service_impl.h"
#include "rrdb/rrdb.code.definition.h"
#include "rrdb/rrdb_types.h"
#include "runtime/api_layer1.h"
#include "runtime/message_utils.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task_code.h"
#include "server/pegasus_server_impl.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
METRIC_DEFINE_counter(replica,
put_requests,
dsn::metric_unit::kRequests,
"The number of PUT requests");
METRIC_DEFINE_counter(replica,
multi_put_requests,
dsn::metric_unit::kRequests,
"The number of MULTI_PUT requests");
METRIC_DEFINE_counter(replica,
remove_requests,
dsn::metric_unit::kRequests,
"The number of REMOVE requests");
METRIC_DEFINE_counter(replica,
multi_remove_requests,
dsn::metric_unit::kRequests,
"The number of MULTI_REMOVE requests");
METRIC_DEFINE_counter(replica,
incr_requests,
dsn::metric_unit::kRequests,
"The number of INCR requests");
METRIC_DEFINE_counter(replica,
check_and_set_requests,
dsn::metric_unit::kRequests,
"The number of CHECK_AND_SET requests");
METRIC_DEFINE_counter(replica,
check_and_mutate_requests,
dsn::metric_unit::kRequests,
"The number of CHECK_AND_MUTATE requests");
METRIC_DEFINE_percentile_int64(replica,
put_latency_ns,
dsn::metric_unit::kNanoSeconds,
"The latency of PUT requests");
METRIC_DEFINE_percentile_int64(replica,
multi_put_latency_ns,
dsn::metric_unit::kNanoSeconds,
"The latency of MULTI_PUT requests");
METRIC_DEFINE_percentile_int64(replica,
remove_latency_ns,
dsn::metric_unit::kNanoSeconds,
"The latency of REMOVE requests");
METRIC_DEFINE_percentile_int64(replica,
multi_remove_latency_ns,
dsn::metric_unit::kNanoSeconds,
"The latency of MULTI_REMOVE requests");
METRIC_DEFINE_percentile_int64(replica,
incr_latency_ns,
dsn::metric_unit::kNanoSeconds,
"The latency of INCR requests");
METRIC_DEFINE_percentile_int64(replica,
check_and_set_latency_ns,
dsn::metric_unit::kNanoSeconds,
"The latency of CHECK_AND_SET requests");
METRIC_DEFINE_percentile_int64(replica,
check_and_mutate_latency_ns,
dsn::metric_unit::kNanoSeconds,
"The latency of CHECK_AND_MUTATE requests");
METRIC_DEFINE_counter(replica,
dup_requests,
dsn::metric_unit::kRequests,
"The number of DUPLICATE requests");
METRIC_DEFINE_percentile_int64(replica,
dup_time_lag_ms,
dsn::metric_unit::kMilliSeconds,
"the time lag (in ms) between master and slave in the duplication");
METRIC_DEFINE_counter(
replica,
dup_lagging_writes,
dsn::metric_unit::kRequests,
"the number of lagging writes (time lag larger than `dup_lagging_write_threshold_ms`)");
DSN_DEFINE_int64(pegasus.server,
dup_lagging_write_threshold_ms,
10 * 1000,
"If the duration that a write flows from master to slave is larger than this "
"threshold, the write is defined a lagging write.");
namespace dsn {
class blob;
class message_ex;
} // namespace dsn
namespace pegasus {
namespace server {
DEFINE_TASK_CODE(LPC_INGESTION, TASK_PRIORITY_COMMON, THREAD_POOL_INGESTION)
pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
: replica_base(server),
_server(server),
_impl(new impl(server)),
_batch_start_time(0),
_cu_calculator(server->_cu_calculator.get()),
METRIC_VAR_INIT_replica(put_requests),
METRIC_VAR_INIT_replica(multi_put_requests),
METRIC_VAR_INIT_replica(remove_requests),
METRIC_VAR_INIT_replica(multi_remove_requests),
METRIC_VAR_INIT_replica(incr_requests),
METRIC_VAR_INIT_replica(check_and_set_requests),
METRIC_VAR_INIT_replica(check_and_mutate_requests),
METRIC_VAR_INIT_replica(put_latency_ns),
METRIC_VAR_INIT_replica(multi_put_latency_ns),
METRIC_VAR_INIT_replica(remove_latency_ns),
METRIC_VAR_INIT_replica(multi_remove_latency_ns),
METRIC_VAR_INIT_replica(incr_latency_ns),
METRIC_VAR_INIT_replica(check_and_set_latency_ns),
METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
METRIC_VAR_INIT_replica(dup_requests),
METRIC_VAR_INIT_replica(dup_time_lag_ms),
METRIC_VAR_INIT_replica(dup_lagging_writes),
_put_batch_size(0),
_remove_batch_size(0)
{
}
pegasus_write_service::~pegasus_write_service() {}
int pegasus_write_service::empty_put(int64_t decree) { return _impl->empty_put(decree); }
int pegasus_write_service::multi_put(const db_write_context &ctx,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
{
METRIC_VAR_AUTO_LATENCY(multi_put_latency_ns);
METRIC_VAR_INCREMENT(multi_put_requests);
int err = _impl->multi_put(ctx, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_multi_put_cu(resp.error, update.hash_key, update.kvs);
}
return err;
}
int pegasus_write_service::multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
{
METRIC_VAR_AUTO_LATENCY(multi_remove_latency_ns);
METRIC_VAR_INCREMENT(multi_remove_requests);
int err = _impl->multi_remove(decree, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_multi_remove_cu(resp.error, update.hash_key, update.sort_keys);
}
return err;
}
int pegasus_write_service::incr(int64_t decree,
const dsn::apps::incr_request &update,
dsn::apps::incr_response &resp)
{
METRIC_VAR_AUTO_LATENCY(incr_latency_ns);
METRIC_VAR_INCREMENT(incr_requests);
int err = _impl->incr(decree, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_incr_cu(resp.error, update.key);
}
return err;
}
int pegasus_write_service::check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp)
{
METRIC_VAR_AUTO_LATENCY(check_and_set_latency_ns);
METRIC_VAR_INCREMENT(check_and_set_requests);
int err = _impl->check_and_set(decree, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_check_and_set_cu(resp.error,
update.hash_key,
update.check_sort_key,
update.set_sort_key,
update.set_value);
}
return err;
}
int pegasus_write_service::check_and_mutate(int64_t decree,
const dsn::apps::check_and_mutate_request &update,
dsn::apps::check_and_mutate_response &resp)
{
METRIC_VAR_AUTO_LATENCY(check_and_mutate_latency_ns);
METRIC_VAR_INCREMENT(check_and_mutate_requests);
int err = _impl->check_and_mutate(decree, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_check_and_mutate_cu(
resp.error, update.hash_key, update.check_sort_key, update.mutate_list);
}
return err;
}
void pegasus_write_service::batch_prepare(int64_t decree)
{
CHECK_EQ_MSG(
_batch_start_time, 0, "batch_prepare and batch_commit/batch_abort must be called in pair");
_batch_start_time = dsn_now_ns();
}
int pegasus_write_service::batch_put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_put must be called after batch_prepare");
++_put_batch_size;
int err = _impl->batch_put(ctx, update, resp);
if (_server->is_primary()) {
_cu_calculator->add_put_cu(resp.error, update.key, update.value);
}
return err;
}
int pegasus_write_service::batch_remove(int64_t decree,
const dsn::blob &key,
dsn::apps::update_response &resp)
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_remove must be called after batch_prepare");
++_remove_batch_size;
int err = _impl->batch_remove(decree, key, resp);
if (_server->is_primary()) {
_cu_calculator->add_remove_cu(resp.error, key);
}
return err;
}
int pegasus_write_service::batch_commit(int64_t decree)
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_commit must be called after batch_prepare");
int err = _impl->batch_commit(decree);
clear_up_batch_states();
return err;
}
void pegasus_write_service::batch_abort(int64_t decree, int err)
{
CHECK_GT_MSG(_batch_start_time, 0, "batch_abort must be called after batch_prepare");
CHECK(err, "must abort on non-zero err");
_impl->batch_abort(decree, err);
clear_up_batch_states();
}
void pegasus_write_service::set_default_ttl(uint32_t ttl) { _impl->set_default_ttl(ttl); }
void pegasus_write_service::clear_up_batch_states()
{
#define PROCESS_WRITE_BATCH(op) \
do { \
METRIC_VAR_INCREMENT_BY(op##_requests, static_cast<int64_t>(_##op##_batch_size)); \
METRIC_VAR_SET(op##_latency_ns, static_cast<size_t>(_##op##_batch_size), latency_ns); \
_##op##_batch_size = 0; \
} while (0)
auto latency_ns = static_cast<int64_t>(dsn_now_ns() - _batch_start_time);
PROCESS_WRITE_BATCH(put);
PROCESS_WRITE_BATCH(remove);
_batch_start_time = 0;
#undef PROCESS_WRITE_BATCH
}
int pegasus_write_service::duplicate(int64_t decree,
const dsn::apps::duplicate_request &requests,
dsn::apps::duplicate_response &resp)
{
// Verifies the cluster_id.
for (const auto &request : requests.entries) {
if (!dsn::replication::is_dup_cluster_id_configured(request.cluster_id)) {
resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint("request cluster id is unconfigured");
return empty_put(decree);
}
if (request.cluster_id == dsn::replication::get_current_dup_cluster_id()) {
resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint("self-duplicating");
return empty_put(decree);
}
METRIC_VAR_INCREMENT(dup_requests);
METRIC_VAR_AUTO_LATENCY(
dup_time_lag_ms, request.timestamp * 1000, [this](uint64_t latency) {
if (latency > FLAGS_dup_lagging_write_threshold_ms) {
METRIC_VAR_INCREMENT(dup_lagging_writes);
}
});
dsn::message_ex *write =
dsn::from_blob_to_received_msg(request.task_code, request.raw_message);
bool is_delete = request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE;
auto remote_timetag = generate_timetag(request.timestamp, request.cluster_id, is_delete);
auto ctx =
db_write_context::create_duplicate(decree, remote_timetag, request.verify_timetag);
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_PUT) {
multi_put_rpc rpc(write);
resp.__set_error(_impl->multi_put(ctx, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE) {
multi_remove_rpc rpc(write);
resp.__set_error(_impl->multi_remove(ctx.decree, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
put_rpc put;
remove_rpc remove;
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
int err = rocksdb::Status::kOk;
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
put = put_rpc(write);
err = _impl->batch_put(ctx, put.request(), put.response());
}
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
remove = remove_rpc(write);
err = _impl->batch_remove(ctx.decree, remove.request(), remove.response());
}
if (!err) {
err = _impl->batch_commit(ctx.decree);
} else {
_impl->batch_abort(ctx.decree, err);
}
resp.__set_error(err);
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code));
return empty_put(ctx.decree);
}
return resp.error;
}
int pegasus_write_service::ingest_files(int64_t decree,
const dsn::replication::ingestion_request &req,
dsn::replication::ingestion_response &resp)
{
// TODO(heyuchen): consider cu
resp.err = dsn::ERR_OK;
// write empty put to flush decree
resp.rocksdb_error = empty_put(decree);
if (resp.rocksdb_error != rocksdb::Status::kOk) {
resp.err = dsn::ERR_TRY_AGAIN;
return resp.rocksdb_error;
}
// ingest files asynchronously
_server->set_ingestion_status(dsn::replication::ingestion_status::IS_RUNNING);
dsn::tasking::enqueue(LPC_INGESTION, &_server->_tracker, [this, decree, req]() {
const auto &err =
_impl->ingest_files(decree, _server->bulk_load_dir(), req, _server->get_ballot());
auto status = dsn::replication::ingestion_status::IS_SUCCEED;
if (err == dsn::ERR_INVALID_VERSION) {
status = dsn::replication::ingestion_status::IS_INVALID;
} else if (err != dsn::ERR_OK) {
status = dsn::replication::ingestion_status::IS_FAILED;
}
_server->set_ingestion_status(status);
});
return rocksdb::Status::kOk;
}
} // namespace server
} // namespace pegasus