blob: 9f4fc4aaff86ea657e87f2685aa37573cfb95ea9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <dsn/cpp/message_utils.h>
#include <dsn/dist/replication/duplication_common.h>
#include <dsn/utility/defer.h>
#include "base/pegasus_key_schema.h"
#include "pegasus_server_write.h"
#include "pegasus_server_impl.h"
#include "logging_utils.h"
#include "pegasus_mutation_duplicator.h"
namespace pegasus {
namespace server {
pegasus_server_write::pegasus_server_write(pegasus_server_impl *server, bool verbose_log)
: replica_base(server), _write_svc(new pegasus_write_service(server)), _verbose_log(verbose_log)
{
init_non_batch_write_handlers();
}
int pegasus_server_write::on_batched_write_requests(dsn::message_ex **requests,
int count,
int64_t decree,
uint64_t timestamp)
{
_write_ctx = db_write_context::create(decree, timestamp);
_decree = decree;
// Write down empty record (RPC_REPLICATION_WRITE_EMPTY) to update
// rocksdb's `last_flushed_decree` (see rocksdb::DB::GetLastFlushedDecree())
// TODO(wutao1): remove it when shared log is removed.
if (count == 0) {
return _write_svc->empty_put(_decree);
}
auto iter = _non_batch_write_handlers.find(requests[0]->rpc_code());
if (iter != _non_batch_write_handlers.end()) {
dassert_f(count == 1, "count = {}", count);
return iter->second(requests[0]);
}
return on_batched_writes(requests, count);
}
void pegasus_server_write::set_default_ttl(uint32_t ttl) { _write_svc->set_default_ttl(ttl); }
int pegasus_server_write::on_batched_writes(dsn::message_ex **requests, int count)
{
int err = 0;
{
_write_svc->batch_prepare(_decree);
for (int i = 0; i < count; ++i) {
dassert(requests[i] != nullptr, "request[%d] is null", i);
// Make sure all writes are batched even if they are failed,
// since we need to record the total qps and rpc latencies,
// and respond for all RPCs regardless of their result.
int local_err = 0;
dsn::task_code rpc_code(requests[i]->rpc_code());
if (rpc_code == dsn::apps::RPC_RRDB_RRDB_PUT) {
auto rpc = put_rpc::auto_reply(requests[i]);
local_err = on_single_put_in_batch(rpc);
_put_rpc_batch.emplace_back(std::move(rpc));
} else if (rpc_code == dsn::apps::RPC_RRDB_RRDB_REMOVE) {
auto rpc = remove_rpc::auto_reply(requests[i]);
local_err = on_single_remove_in_batch(rpc);
_remove_rpc_batch.emplace_back(std::move(rpc));
} else {
if (_non_batch_write_handlers.find(rpc_code) != _non_batch_write_handlers.end()) {
dfatal_f("rpc code not allow batch: {}", rpc_code.to_string());
} else {
dfatal_f("rpc code not handled: {}", rpc_code.to_string());
}
}
if (!err && local_err) {
err = local_err;
}
}
if (err == 0) {
err = _write_svc->batch_commit(_decree);
} else {
_write_svc->batch_abort(_decree, err);
}
}
// reply the batched RPCs
_put_rpc_batch.clear();
_remove_rpc_batch.clear();
return err;
}
void pegasus_server_write::request_key_check(int64_t decree,
dsn::message_ex *msg,
const dsn::blob &key)
{
// TODO(wutao1): server should not assert when client's hash is incorrect.
if (msg->header->client.partition_hash != 0) {
uint64_t partition_hash = pegasus_key_hash(key);
dassert(msg->header->client.partition_hash == partition_hash,
"inconsistent partition hash");
int thread_hash = get_gpid().thread_hash();
dassert(msg->header->client.thread_hash == thread_hash, "inconsistent thread hash");
}
if (_verbose_log) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(key, hash_key, sort_key);
ddebug_rocksdb("Write",
"decree: {}, code: {}, hash_key: {}, sort_key: {}",
decree,
msg->local_rpc_code.to_string(),
utils::c_escape_string(hash_key),
utils::c_escape_string(sort_key));
}
}
void pegasus_server_write::init_non_batch_write_handlers()
{
_non_batch_write_handlers = {
{dsn::apps::RPC_RRDB_RRDB_MULTI_PUT,
[this](dsn::message_ex *request) -> int {
auto rpc = multi_put_rpc::auto_reply(request);
return _write_svc->multi_put(_write_ctx, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_MULTI_REMOVE,
[this](dsn::message_ex *request) -> int {
auto rpc = multi_remove_rpc::auto_reply(request);
return _write_svc->multi_remove(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_INCR,
[this](dsn::message_ex *request) -> int {
auto rpc = incr_rpc::auto_reply(request);
return _write_svc->incr(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_DUPLICATE,
[this](dsn::message_ex *request) -> int {
auto rpc = duplicate_rpc::auto_reply(request);
return _write_svc->duplicate(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET,
[this](dsn::message_ex *request) -> int {
auto rpc = check_and_set_rpc::auto_reply(request);
return _write_svc->check_and_set(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE,
[this](dsn::message_ex *request) -> int {
auto rpc = check_and_mutate_rpc::auto_reply(request);
return _write_svc->check_and_mutate(_decree, rpc.request(), rpc.response());
}},
{dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
[this](dsn::message_ex *request) -> int {
auto rpc = ingestion_rpc::auto_reply(request);
return _write_svc->ingestion_files(_decree, rpc.request(), rpc.response());
}},
};
}
} // namespace server
} // namespace pegasus