blob: 59203836be152ca4a43bcd9bfc389f823d45c83a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "rocksdb_wrapper.h"
#include <absl/strings/string_view.h>
#include <rocksdb/db.h>
#include <rocksdb/slice.h>
#include <rocksdb/status.h>
#include "base/meta_store.h"
#include "base/pegasus_value_schema.h"
#include "common/duplication_common.h"
#include "pegasus_key_schema.h"
#include "pegasus_utils.h"
#include "pegasus_write_service_impl.h"
#include "server/logging_utils.h"
#include "server/pegasus_server_impl.h"
#include "server/pegasus_write_service.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/fail_point.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"
DSN_DEFINE_int32(pegasus.server,
inject_write_error_for_test,
0,
"Which error code to inject in write path, 0 means no error. Only for test.");
DSN_TAG_VARIABLE(inject_write_error_for_test, FT_MUTABLE);
DSN_DEFINE_bool(pegasus.server,
rocksdb_write_global_seqno,
false,
"If write_global_seqno is true, rocksdb will modify "
"'rocksdb.external_sst_file.global_seqno' of ssttable file during ingest process. "
"If false, it will not be modified.");
METRIC_DECLARE_counter(read_expired_values);
namespace pegasus {
namespace server {
rocksdb_wrapper::rocksdb_wrapper(pegasus_server_impl *server)
: replica_base(server),
_db(server->_db),
_rd_opts(server->_data_cf_rd_opts),
_meta_cf(server->_meta_cf),
_pegasus_data_version(server->_pegasus_data_version),
METRIC_VAR_INIT_replica(read_expired_values),
_default_ttl(0)
{
_write_batch = std::make_unique<rocksdb::WriteBatch>();
_value_generator = std::make_unique<pegasus_value_generator>();
_wt_opts = std::make_unique<rocksdb::WriteOptions>();
// disable write ahead logging as replication handles logging instead now
_wt_opts->disableWAL = true;
}
int rocksdb_wrapper::get(absl::string_view raw_key, /*out*/ db_get_context *ctx)
{
FAIL_POINT_INJECT_F("db_get", [](absl::string_view) -> int { return FAIL_DB_GET; });
rocksdb::Status s = _db->Get(_rd_opts, utils::to_rocksdb_slice(raw_key), &(ctx->raw_value));
if (dsn_likely(s.ok())) {
// success
ctx->found = true;
ctx->expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, ctx->raw_value);
if (check_if_ts_expired(utils::epoch_now(), ctx->expire_ts)) {
ctx->expired = true;
METRIC_VAR_INCREMENT(read_expired_values);
}
return rocksdb::Status::kOk;
} else if (s.IsNotFound()) {
// NotFound is an acceptable error
ctx->found = false;
return rocksdb::Status::kOk;
}
dsn::blob hash_key, sort_key;
pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
LOG_ERROR_ROCKSDB("Get",
s.ToString(),
"hash_key: {}, sort_key: {}",
utils::c_escape_sensitive_string(hash_key),
utils::c_escape_sensitive_string(sort_key));
return s.code();
}
int rocksdb_wrapper::write_batch_put(int64_t decree,
absl::string_view raw_key,
absl::string_view value,
uint32_t expire_sec)
{
return write_batch_put_ctx(db_write_context::empty(decree), raw_key, value, expire_sec);
}
int rocksdb_wrapper::write_batch_put_ctx(const db_write_context &ctx,
absl::string_view raw_key,
absl::string_view value,
uint32_t expire_sec)
{
FAIL_POINT_INJECT_F("db_write_batch_put",
[](absl::string_view) -> int { return FAIL_DB_WRITE_BATCH_PUT; });
uint64_t new_timetag = ctx.remote_timetag;
if (!ctx.is_duplicated_write()) { // local write
new_timetag = generate_timetag(
ctx.timestamp, dsn::replication::get_current_dup_cluster_id_or_default(), false);
}
if (ctx.verify_timetag && // needs read-before-write
_pegasus_data_version >= 1 && // data version 0 doesn't support timetag.
!raw_key.empty()) { // not an empty write
db_get_context get_ctx;
int err = get(raw_key, &get_ctx);
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
return err;
}
// if record exists and is not expired.
if (get_ctx.found && !get_ctx.expired) {
uint64_t local_timetag =
pegasus_extract_timetag(_pegasus_data_version, get_ctx.raw_value);
if (local_timetag >= new_timetag) {
// ignore this stale update with lower timetag,
// and write an empty record instead
raw_key = value = absl::string_view();
}
}
}
rocksdb::Slice skey = utils::to_rocksdb_slice(raw_key);
rocksdb::SliceParts skey_parts(&skey, 1);
rocksdb::SliceParts svalue = _value_generator->generate_value(
_pegasus_data_version, value, db_expire_ts(expire_sec), new_timetag);
rocksdb::Status s = _write_batch->Put(skey_parts, svalue);
if (dsn_unlikely(!s.ok())) {
::dsn::blob hash_key, sort_key;
pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
LOG_ERROR_ROCKSDB("WriteBatchPut",
s.ToString(),
"decree: {}, hash_key: {}, sort_key: {}, expire_ts: {}",
ctx.decree,
utils::c_escape_sensitive_string(hash_key),
utils::c_escape_sensitive_string(sort_key),
expire_sec);
}
return s.code();
}
int rocksdb_wrapper::write(int64_t decree)
{
CHECK_GT(_write_batch->Count(), 0);
if (dsn_unlikely(FLAGS_inject_write_error_for_test != rocksdb::Status::kOk)) {
return FLAGS_inject_write_error_for_test;
}
FAIL_POINT_INJECT_F("db_write", [](absl::string_view) -> int { return FAIL_DB_WRITE; });
rocksdb::Status status =
_write_batch->Put(_meta_cf, meta_store::LAST_FLUSHED_DECREE, std::to_string(decree));
if (dsn_unlikely(!status.ok())) {
LOG_ERROR_ROCKSDB("Write",
status.ToString(),
"put decree of meta cf into batch error, decree: {}",
decree);
return status.code();
}
status = _db->Write(*_wt_opts, _write_batch.get());
if (dsn_unlikely(!status.ok())) {
LOG_ERROR_ROCKSDB("Write", status.ToString(), "write rocksdb error, decree: {}", decree);
}
return status.code();
}
int rocksdb_wrapper::write_batch_delete(int64_t decree, absl::string_view raw_key)
{
FAIL_POINT_INJECT_F("db_write_batch_delete",
[](absl::string_view) -> int { return FAIL_DB_WRITE_BATCH_DELETE; });
rocksdb::Status s = _write_batch->Delete(utils::to_rocksdb_slice(raw_key));
if (dsn_unlikely(!s.ok())) {
dsn::blob hash_key, sort_key;
pegasus_restore_key(dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
LOG_ERROR_ROCKSDB("write_batch_delete",
s.ToString(),
"decree: {}, hash_key: {}, sort_key: {}",
decree,
utils::c_escape_sensitive_string(hash_key),
utils::c_escape_sensitive_string(sort_key));
}
return s.code();
}
void rocksdb_wrapper::clear_up_write_batch() { _write_batch->Clear(); }
int rocksdb_wrapper::ingest_files(int64_t decree,
const std::vector<std::string> &sst_file_list,
const bool ingest_behind)
{
rocksdb::IngestExternalFileOptions ifo;
ifo.move_files = true;
ifo.ingest_behind = ingest_behind;
ifo.write_global_seqno = FLAGS_rocksdb_write_global_seqno;
rocksdb::Status s = _db->IngestExternalFile(sst_file_list, ifo);
if (dsn_unlikely(!s.ok())) {
LOG_ERROR_ROCKSDB("IngestExternalFile",
s.ToString(),
"decree = {}, ingest_behind = {}",
decree,
ingest_behind);
} else {
LOG_INFO_ROCKSDB("IngestExternalFile",
"Ingest files succeed, decree = {}, ingest_behind = {}",
decree,
ingest_behind);
}
return s.code();
}
void rocksdb_wrapper::set_default_ttl(uint32_t ttl)
{
if (_default_ttl != ttl) {
_default_ttl = ttl;
LOG_INFO_PREFIX("update _default_ttl to {}", ttl);
}
}
uint32_t rocksdb_wrapper::db_expire_ts(uint32_t expire_ts)
{
// use '_default_ttl' when ttl is not set for this write operation.
if (_default_ttl != 0 && expire_ts == 0) {
return utils::epoch_now() + _default_ttl;
}
return expire_ts;
}
} // namespace server
} // namespace pegasus