blob: 6eec4b76010962e6039d0c69b879784ee35dd121 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <gtest/gtest_prod.h>
#include "base/idl_utils.h"
#include "base/meta_store.h"
#include "base/pegasus_key_schema.h"
#include "logging_utils.h"
#include "pegasus_server_impl.h"
#include "pegasus_write_service.h"
#include "rocksdb_wrapper.h"
#include "utils/defer.h"
#include "utils/env.h"
#include "utils/filesystem.h"
#include "utils/string_conv.h"
#include "utils/strings.h"
namespace pegasus {
namespace server {
/// internal error codes used for fail injection
// TODO(yingchun): Use real rocksdb::Status::code.
static constexpr int FAIL_DB_WRITE_BATCH_PUT = -101;
static constexpr int FAIL_DB_WRITE_BATCH_DELETE = -102;
static constexpr int FAIL_DB_WRITE = -103;
static constexpr int FAIL_DB_GET = -104;
struct db_get_context
{
// value read from DB.
std::string raw_value;
// is the record found in DB.
bool found{false};
// the expiration time encoded in raw_value.
uint32_t expire_ts{0};
// is the record expired.
bool expired{false};
};
inline dsn::error_code get_external_files_path(const std::string &bulk_load_dir,
const bool verify_before_ingest,
const dsn::replication::bulk_load_metadata &metadata,
/*out*/ std::vector<std::string> &files_path)
{
for (const auto &f_meta : metadata.files) {
const auto &file_name = dsn::utils::filesystem::path_combine(bulk_load_dir, f_meta.name);
if (verify_before_ingest &&
!dsn::utils::filesystem::verify_file(
file_name, dsn::utils::FileDataType::kSensitive, f_meta.md5, f_meta.size)) {
break;
}
files_path.emplace_back(file_name);
}
return files_path.size() == metadata.files.size() ? dsn::ERR_OK : dsn::ERR_WRONG_CHECKSUM;
}
class pegasus_write_service::impl : public dsn::replication::replica_base
{
public:
explicit impl(pegasus_server_impl *server)
: replica_base(server),
_primary_host_port(server->_primary_host_port),
_pegasus_data_version(server->_pegasus_data_version)
{
_rocksdb_wrapper = std::make_unique<rocksdb_wrapper>(server);
}
int empty_put(int64_t decree)
{
int err =
_rocksdb_wrapper->write_batch_put(decree, absl::string_view(), absl::string_view(), 0);
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
if (err != rocksdb::Status::kOk) {
return err;
}
return _rocksdb_wrapper->write(decree);
}
int multi_put(const db_write_context &ctx,
const dsn::apps::multi_put_request &update,
dsn::apps::update_response &resp)
{
int64_t decree = ctx.decree;
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_host_port;
if (update.kvs.empty()) {
LOG_ERROR_PREFIX("invalid argument for multi_put: decree = {}, error = {}",
decree,
"request.kvs is empty");
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
for (auto &kv : update.kvs) {
resp.error = _rocksdb_wrapper->write_batch_put_ctx(
ctx,
composite_raw_key(update.hash_key.to_string_view(), kv.key.to_string_view())
.to_string_view(),
kv.value.to_string_view(),
static_cast<uint32_t>(update.expire_ts_seconds));
if (resp.error) {
return resp.error;
}
}
resp.error = _rocksdb_wrapper->write(decree);
return resp.error;
}
int multi_remove(int64_t decree,
const dsn::apps::multi_remove_request &update,
dsn::apps::multi_remove_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_host_port;
if (update.sort_keys.empty()) {
LOG_ERROR_PREFIX("invalid argument for multi_remove: decree = {}, error = {}",
decree,
"request.sort_keys is empty");
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
for (auto &sort_key : update.sort_keys) {
resp.error = _rocksdb_wrapper->write_batch_delete(
decree,
composite_raw_key(update.hash_key.to_string_view(), sort_key.to_string_view())
.to_string_view());
if (resp.error) {
return resp.error;
}
}
resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == rocksdb::Status::kOk) {
resp.count = update.sort_keys.size();
}
return resp.error;
}
int incr(int64_t decree, const dsn::apps::incr_request &update, dsn::apps::incr_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_host_port;
absl::string_view raw_key = update.key.to_string_view();
int64_t new_value = 0;
uint32_t new_expire_ts = 0;
db_get_context get_ctx;
int err = _rocksdb_wrapper->get(raw_key, &get_ctx);
if (err != rocksdb::Status::kOk) {
resp.error = err;
return err;
}
if (!get_ctx.found) {
// old value is not found, set to 0 before increment
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else if (get_ctx.expired) {
// ttl timeout, set to 0 before increment
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else {
::dsn::blob old_value;
pegasus_extract_user_data(
_pegasus_data_version, std::move(get_ctx.raw_value), old_value);
if (old_value.length() == 0) {
// empty old value, set to 0 before increment
new_value = update.increment;
} else {
int64_t old_value_int;
if (!dsn::buf2int64(old_value.to_string_view(), old_value_int)) {
// invalid old value
LOG_ERROR_PREFIX("incr failed: decree = {}, error = "
"old value \"{}\" is not an integer or out of range",
decree,
utils::c_escape_sensitive_string(old_value));
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
new_value = old_value_int + update.increment;
if ((update.increment > 0 && new_value < old_value_int) ||
(update.increment < 0 && new_value > old_value_int)) {
// new value is out of range, return old value by 'new_value'
LOG_ERROR_PREFIX("incr failed: decree = {}, error = "
"new value is out of range, old_value = {}, increment = {}",
decree,
old_value_int,
update.increment);
resp.error = rocksdb::Status::kInvalidArgument;
resp.new_value = old_value_int;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
}
// set new ttl
if (update.expire_ts_seconds == 0) {
new_expire_ts = get_ctx.expire_ts;
} else if (update.expire_ts_seconds < 0) {
new_expire_ts = 0;
} else { // update.expire_ts_seconds > 0
new_expire_ts = update.expire_ts_seconds;
}
}
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
resp.error = _rocksdb_wrapper->write_batch_put(
decree, update.key.to_string_view(), std::to_string(new_value), new_expire_ts);
if (resp.error) {
return resp.error;
}
resp.error = _rocksdb_wrapper->write(decree);
if (resp.error == rocksdb::Status::kOk) {
resp.new_value = new_value;
}
return resp.error;
}
int check_and_set(int64_t decree,
const dsn::apps::check_and_set_request &update,
dsn::apps::check_and_set_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_host_port;
if (!is_check_type_supported(update.check_type)) {
LOG_ERROR_PREFIX("invalid argument for check_and_set: decree = {}, error = {}",
decree,
fmt::format("check type {} not supported", update.check_type));
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
::dsn::blob check_key;
pegasus_generate_key(check_key, update.hash_key, update.check_sort_key);
db_get_context get_context;
absl::string_view check_raw_key = check_key.to_string_view();
int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
if (err != rocksdb::Status::kOk) {
// read check value failed
LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndSet decree: {}, hash_key: {}, "
"check_sort_key: {}",
decree,
utils::c_escape_sensitive_string(update.hash_key),
utils::c_escape_sensitive_string(update.check_sort_key));
resp.error = err;
return resp.error;
}
::dsn::blob check_value;
bool value_exist = !get_context.expired && get_context.found;
if (value_exist) {
pegasus_extract_user_data(
_pegasus_data_version, std::move(get_context.raw_value), check_value);
}
if (update.return_check_value) {
resp.check_value_returned = true;
if (value_exist) {
resp.check_value_exist = true;
resp.check_value = check_value;
}
}
bool invalid_argument = false;
bool passed = validate_check(decree,
update.check_type,
update.check_operand,
value_exist,
check_value,
invalid_argument);
if (passed) {
// check passed, write new value
::dsn::blob set_key;
if (update.set_diff_sort_key) {
pegasus_generate_key(set_key, update.hash_key, update.set_sort_key);
} else {
set_key = check_key;
}
resp.error = _rocksdb_wrapper->write_batch_put(
decree,
set_key.to_string_view(),
update.set_value.to_string_view(),
static_cast<uint32_t>(update.set_expire_ts_seconds));
} else {
// check not passed, write empty record to update rocksdb's last flushed decree
resp.error = _rocksdb_wrapper->write_batch_put(
decree, absl::string_view(), absl::string_view(), 0);
}
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
if (resp.error) {
return resp.error;
}
resp.error = _rocksdb_wrapper->write(decree);
if (resp.error) {
return resp.error;
}
if (!passed) {
// check not passed, return proper error code to user
resp.error =
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}
return rocksdb::Status::kOk;
}
int check_and_mutate(int64_t decree,
const dsn::apps::check_and_mutate_request &update,
dsn::apps::check_and_mutate_response &resp)
{
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_host_port;
if (update.mutate_list.empty()) {
LOG_ERROR_PREFIX("invalid argument for check_and_mutate: decree = {}, error = {}",
decree,
"mutate list is empty");
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
for (int i = 0; i < update.mutate_list.size(); ++i) {
auto &mu = update.mutate_list[i];
if (mu.operation != ::dsn::apps::mutate_operation::MO_PUT &&
mu.operation != ::dsn::apps::mutate_operation::MO_DELETE) {
LOG_ERROR_PREFIX("invalid argument for check_and_mutate: decree = {}, error = "
"mutation[{}] uses invalid operation {}",
decree,
i,
mu.operation);
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
}
if (!is_check_type_supported(update.check_type)) {
LOG_ERROR_PREFIX("invalid argument for check_and_mutate: decree = {}, error = {}",
decree,
fmt::format("check type {} not supported", update.check_type));
resp.error = rocksdb::Status::kInvalidArgument;
// we should write empty record to update rocksdb's last flushed decree
return empty_put(decree);
}
::dsn::blob check_key;
pegasus_generate_key(check_key, update.hash_key, update.check_sort_key);
db_get_context get_context;
absl::string_view check_raw_key = check_key.to_string_view();
int err = _rocksdb_wrapper->get(check_raw_key, &get_context);
if (err != rocksdb::Status::kOk) {
// read check value failed
LOG_ERROR_ROCKSDB("Error to GetCheckValue for CheckAndMutate decree: {}, hash_key: {}, "
"check_sort_key: {}",
decree,
utils::c_escape_sensitive_string(update.hash_key),
utils::c_escape_sensitive_string(update.check_sort_key));
resp.error = err;
return resp.error;
}
::dsn::blob check_value;
bool value_exist = !get_context.expired && get_context.found;
if (value_exist) {
pegasus_extract_user_data(
_pegasus_data_version, std::move(get_context.raw_value), check_value);
}
if (update.return_check_value) {
resp.check_value_returned = true;
if (value_exist) {
resp.check_value_exist = true;
resp.check_value = check_value;
}
}
bool invalid_argument = false;
bool passed = validate_check(decree,
update.check_type,
update.check_operand,
value_exist,
check_value,
invalid_argument);
if (passed) {
for (auto &m : update.mutate_list) {
::dsn::blob key;
pegasus_generate_key(key, update.hash_key, m.sort_key);
if (m.operation == ::dsn::apps::mutate_operation::MO_PUT) {
resp.error = _rocksdb_wrapper->write_batch_put(
decree,
key.to_string_view(),
m.value.to_string_view(),
static_cast<uint32_t>(m.set_expire_ts_seconds));
} else {
CHECK_EQ(m.operation, ::dsn::apps::mutate_operation::MO_DELETE);
resp.error = _rocksdb_wrapper->write_batch_delete(decree, key.to_string_view());
}
// in case of failure, cancel mutations
if (resp.error)
break;
}
} else {
// check not passed, write empty record to update rocksdb's last flushed decree
resp.error = _rocksdb_wrapper->write_batch_put(
decree, absl::string_view(), absl::string_view(), 0);
}
auto cleanup = dsn::defer([this]() { _rocksdb_wrapper->clear_up_write_batch(); });
if (resp.error) {
return resp.error;
}
resp.error = _rocksdb_wrapper->write(decree);
if (resp.error) {
return resp.error;
}
if (!passed) {
// check not passed, return proper error code to user
resp.error =
invalid_argument ? rocksdb::Status::kInvalidArgument : rocksdb::Status::kTryAgain;
}
return rocksdb::Status::kOk;
}
// \return ERR_INVALID_VERSION: replay or commit out-date ingest request
// \return ERR_WRONG_CHECKSUM: verify files failed
// \return ERR_INGESTION_FAILED: rocksdb ingestion failed
// \return ERR_OK: rocksdb ingestion succeed
dsn::error_code ingest_files(const int64_t decree,
const std::string &bulk_load_dir,
const dsn::replication::ingestion_request &req,
const int64_t current_ballot)
{
const auto &req_ballot = req.ballot;
// if ballot updated, ignore this request
if (req_ballot < current_ballot) {
LOG_WARNING_PREFIX("out-dated ingestion request, ballot changed, request({}) vs "
"current({}), ignore it",
req_ballot,
current_ballot);
return dsn::ERR_INVALID_VERSION;
}
// verify external files before ingestion
std::vector<std::string> sst_file_list;
const auto &err = get_external_files_path(
bulk_load_dir, req.verify_before_ingest, req.metadata, sst_file_list);
if (err != dsn::ERR_OK) {
return err;
}
// ingest external files
if (dsn_unlikely(_rocksdb_wrapper->ingest_files(decree, sst_file_list, req.ingest_behind) !=
rocksdb::Status::kOk)) {
return dsn::ERR_INGESTION_FAILED;
}
return dsn::ERR_OK;
}
/// For batch write.
int batch_put(const db_write_context &ctx,
const dsn::apps::update_request &update,
dsn::apps::update_response &resp)
{
resp.error =
_rocksdb_wrapper->write_batch_put_ctx(ctx,
update.key.to_string_view(),
update.value.to_string_view(),
static_cast<uint32_t>(update.expire_ts_seconds));
_update_responses.emplace_back(&resp);
return resp.error;
}
int batch_remove(int64_t decree, const dsn::blob &key, dsn::apps::update_response &resp)
{
resp.error = _rocksdb_wrapper->write_batch_delete(decree, key.to_string_view());
_update_responses.emplace_back(&resp);
return resp.error;
}
int batch_commit(int64_t decree)
{
int err = _rocksdb_wrapper->write(decree);
clear_up_batch_states(decree, err);
return err;
}
void batch_abort(int64_t decree, int err) { clear_up_batch_states(decree, err); }
void set_default_ttl(uint32_t ttl) { _rocksdb_wrapper->set_default_ttl(ttl); }
private:
void clear_up_batch_states(int64_t decree, int err)
{
if (!_update_responses.empty()) {
dsn::apps::update_response resp;
resp.error = err;
resp.app_id = get_gpid().get_app_id();
resp.partition_index = get_gpid().get_partition_index();
resp.decree = decree;
resp.server = _primary_host_port;
for (dsn::apps::update_response *uresp : _update_responses) {
*uresp = resp;
}
_update_responses.clear();
}
_rocksdb_wrapper->clear_up_write_batch();
}
static dsn::blob composite_raw_key(absl::string_view hash_key, absl::string_view sort_key)
{
dsn::blob raw_key;
pegasus_generate_key(raw_key, hash_key, sort_key);
return raw_key;
}
// return true if the check type is supported
static bool is_check_type_supported(::dsn::apps::cas_check_type::type check_type)
{
return check_type >= ::dsn::apps::cas_check_type::CT_NO_CHECK &&
check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER;
}
// return true if check passed.
// for int compare, if check operand or value are not valid integer, then return false,
// and set out param `invalid_argument' to false.
bool validate_check(int64_t decree,
::dsn::apps::cas_check_type::type check_type,
const ::dsn::blob &check_operand,
bool value_exist,
const ::dsn::blob &value,
bool &invalid_argument)
{
invalid_argument = false;
switch (check_type) {
case ::dsn::apps::cas_check_type::CT_NO_CHECK:
return true;
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EXIST:
return !value_exist;
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EXIST_OR_EMPTY:
return !value_exist || value.length() == 0;
case ::dsn::apps::cas_check_type::CT_VALUE_EXIST:
return value_exist;
case ::dsn::apps::cas_check_type::CT_VALUE_NOT_EMPTY:
return value_exist && value.length() != 0;
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE:
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX:
case ::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX: {
if (!value_exist)
return false;
if (check_operand.length() == 0)
return true;
if (value.length() < check_operand.length())
return false;
if (check_type == ::dsn::apps::cas_check_type::CT_VALUE_MATCH_ANYWHERE) {
return value.to_string_view().find(check_operand.to_string_view()) !=
absl::string_view::npos;
} else if (check_type == ::dsn::apps::cas_check_type::CT_VALUE_MATCH_PREFIX) {
return dsn::utils::mequals(
value.data(), check_operand.data(), check_operand.length());
} else { // check_type == ::dsn::apps::cas_check_type::CT_VALUE_MATCH_POSTFIX
return dsn::utils::mequals(value.data() + value.length() - check_operand.length(),
check_operand.data(),
check_operand.length());
}
}
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER: {
if (!value_exist)
return false;
int c = value.to_string_view().compare(check_operand.to_string_view());
if (c < 0) {
return check_type <= ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL;
} else if (c == 0) {
return check_type >= ::dsn::apps::cas_check_type::CT_VALUE_BYTES_LESS_OR_EQUAL &&
check_type <= ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
} else { // c > 0
return check_type >= ::dsn::apps::cas_check_type::CT_VALUE_BYTES_GREATER_OR_EQUAL;
}
}
case ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL:
case ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER: {
if (!value_exist)
return false;
int64_t check_value_int;
if (!dsn::buf2int64(value.to_string_view(), check_value_int)) {
// invalid check value
LOG_ERROR_PREFIX("check failed: decree = {}, error = "
"check value \"{}\" is not an integer or out of range",
decree,
utils::c_escape_sensitive_string(value));
invalid_argument = true;
return false;
}
int64_t check_operand_int;
if (!dsn::buf2int64(check_operand.to_string_view(), check_operand_int)) {
// invalid check operand
LOG_ERROR_PREFIX("check failed: decree = {}, error = "
"check operand \"{}\" is not an integer or out of range",
decree,
utils::c_escape_sensitive_string(check_operand));
invalid_argument = true;
return false;
}
if (check_value_int < check_operand_int) {
return check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL;
} else if (check_value_int == check_operand_int) {
return check_type >= ::dsn::apps::cas_check_type::CT_VALUE_INT_LESS_OR_EQUAL &&
check_type <= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
} else { // check_value_int > check_operand_int
return check_type >= ::dsn::apps::cas_check_type::CT_VALUE_INT_GREATER_OR_EQUAL;
}
}
default:
CHECK(false, "unsupported check type: {}", check_type);
}
return false;
}
private:
friend class pegasus_write_service_test;
friend class pegasus_server_write_test;
friend class pegasus_write_service_impl_test;
friend class rocksdb_wrapper_test;
FRIEND_TEST(pegasus_write_service_impl_test, put_verify_timetag);
FRIEND_TEST(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0);
const std::string _primary_host_port;
const uint32_t _pegasus_data_version;
std::unique_ptr<rocksdb_wrapper> _rocksdb_wrapper;
// for setting update_response.error after committed.
std::vector<dsn::apps::update_response *> _update_responses;
};
} // namespace server
} // namespace pegasus