blob: f430b48b0c70b30bc22ee7302fff873da2b22dbb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <cstdint>
#include <memory>
#include "replica/replica_base.h"
#include "utils/metrics.h"
namespace dsn {
class blob;
namespace apps {
class check_and_mutate_request;
class check_and_mutate_response;
class check_and_set_request;
class check_and_set_response;
class duplicate_request;
class duplicate_response;
class incr_request;
class incr_response;
class multi_put_request;
class multi_remove_request;
class multi_remove_response;
class update_request;
class update_response;
} // namespace apps
namespace replication {
class ingestion_request;
class ingestion_response;
} // namespace replication
} // namespace dsn
namespace pegasus {
namespace server {
// The context of an mutation to the database.
struct db_write_context
{
// the mutation decree
int64_t decree{0};
// The time when this mutation is generated.
// This is used to calculate the new timetag.
uint64_t timestamp{0};
// timetag of the remote write, 0 if it's not from remote.
uint64_t remote_timetag{0};
// Whether to compare the timetag of old value with the new write's.
// - If true, it requires a read to the DB before write. If the old record has a larger timetag
// than `remote_timetag`, the write will be ignored, otherwise it will be applied.
// - If false, no overhead for the write but the eventual consistency on duplication
// is not guaranteed.
//
// This is for duplicated write only. Because under casual consistency, the former
// duplicated write must **happen before** the latest local write, regardless whether
// its timestamp is larger. This relationship can be easily proved:
// ```
// T1(put "a") > T2(duplicated put "b") > T3(put "b" in remote cluster)
// ```
// However write conflict may still result in inconsistent data in different clusters,
// though those "versions" can all be considered as latest. User who requires consistency
// can read from one main cluster instead of reading from multiple.
bool verify_timetag{false};
static inline db_write_context empty(int64_t d) { return create(d, 0); }
// Creates a context for normal write.
static inline db_write_context create(int64_t decree, uint64_t timestamp)
{
db_write_context ctx;
ctx.decree = decree;
ctx.timestamp = timestamp;
return ctx;
}
// Creates a context for duplicated write.
static inline db_write_context
create_duplicate(int64_t decree, uint64_t remote_timetag, bool verify_timetag)
{
db_write_context ctx;
ctx.decree = decree;
ctx.remote_timetag = remote_timetag;
ctx.verify_timetag = verify_timetag;
return ctx;
}
bool is_duplicated_write() const { return remote_timetag > 0; }
};
class capacity_unit_calculator;
class pegasus_server_impl;
/// Handle the write requests.
/// As the signatures imply, this class is not responsible for replying the rpc,
/// the caller(pegasus_server_write) should do.
/// \see pegasus::server::pegasus_server_write::on_batched_write_requests
class pegasus_write_service : dsn::replication::replica_base
{
public:
explicit pegasus_write_service(pegasus_server_impl *server);
~pegasus_write_service();
// Write empty record.
// See this document (https://pegasus.apache.org/zh/2018/03/07/last_flushed_decree.html)
// to know why we must have empty write.
int empty_put(int64_t decree);
// Write MULTI_PUT record.
int multi_put(const db_write_context &ctx,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp);
// Write MULTI_REMOVE record.
int multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp);
// Write INCR record.
int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp);
// Write CHECK_AND_SET record.
int check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp);
// Write CHECK_AND_MUTATE record.
int check_and_mutate(int64_t decree,
const dsn::apps::check_and_mutate_request &update,
dsn::apps::check_and_mutate_response &resp);
// Handles DUPLICATE duplicated from remote.
int duplicate(int64_t decree,
const dsn::apps::duplicate_request &update,
dsn::apps::duplicate_response &resp);
// Execute bulk load ingestion
int ingest_files(int64_t decree,
const dsn::replication::ingestion_request &req,
dsn::replication::ingestion_response &resp);
/// For batch write.
// Prepare batch write.
void batch_prepare(int64_t decree);
// Add PUT record in batch write.
// \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp);
// Add REMOVE record in batch write.
// \returns rocksdb::Status::Code.
// NOTE that `resp` should not be moved or freed while the batch is not committed.
int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp);
// Commit batch write.
// \returns rocksdb::Status::Code.
// NOTE that if the batch contains no updates, rocksdb::Status::kOk is returned.
int batch_commit(int64_t decree);
// Abort batch write.
void batch_abort(int64_t decree, int err);
void set_default_ttl(uint32_t ttl);
private:
void clear_up_batch_states();
private:
friend class pegasus_write_service_test;
friend class pegasus_write_service_impl_test;
friend class pegasus_server_write_test;
friend class rocksdb_wrapper_test;
pegasus_server_impl *_server;
class impl;
std::unique_ptr<impl> _impl;
uint64_t _batch_start_time;
capacity_unit_calculator *_cu_calculator;
METRIC_VAR_DECLARE_counter(put_requests);
METRIC_VAR_DECLARE_counter(multi_put_requests);
METRIC_VAR_DECLARE_counter(remove_requests);
METRIC_VAR_DECLARE_counter(multi_remove_requests);
METRIC_VAR_DECLARE_counter(incr_requests);
METRIC_VAR_DECLARE_counter(check_and_set_requests);
METRIC_VAR_DECLARE_counter(check_and_mutate_requests);
METRIC_VAR_DECLARE_percentile_int64(put_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(multi_put_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(remove_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(multi_remove_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(incr_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(check_and_set_latency_ns);
METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns);
METRIC_VAR_DECLARE_counter(dup_requests);
METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
METRIC_VAR_DECLARE_counter(dup_lagging_writes);
// Record batch size for put and remove requests.
uint32_t _put_batch_size;
uint32_t _remove_batch_size;
// TODO(wutao1): add metrics for failed rpc.
};
} // namespace server
} // namespace pegasus