| // Copyright (c) 2017, Xiaomi, Inc. All rights reserved. |
| // This source code is licensed under the Apache License Version 2.0, which |
| // can be found in the LICENSE file in the root directory of this source tree. |
| |
| #include "pegasus_server_test_base.h" |
| #include "server/pegasus_server_write.h" |
| #include "server/pegasus_write_service_impl.h" |
| #include "message_utils.h" |
| |
| #include <dsn/utility/defer.h> |
| |
| namespace pegasus { |
| namespace server { |
| |
| class pegasus_write_service_impl_test : public pegasus_server_test_base |
| { |
| protected: |
| std::unique_ptr<pegasus_server_write> _server_write; |
| pegasus_write_service::impl *_write_impl{nullptr}; |
| |
| public: |
| void SetUp() override |
| { |
| start(); |
| _server_write = dsn::make_unique<pegasus_server_write>(_server.get(), true); |
| _write_impl = _server_write->_write_svc->_impl.get(); |
| } |
| |
| uint64_t read_timestamp_from(dsn::string_view raw_key) |
| { |
| std::string raw_value; |
| rocksdb::Status s = _write_impl->_db->Get( |
| _write_impl->_rd_opts, utils::to_rocksdb_slice(raw_key), &raw_value); |
| |
| uint64_t local_timetag = |
| pegasus_extract_timetag(_write_impl->_pegasus_data_version, raw_value); |
| return extract_timestamp_from_timetag(local_timetag); |
| } |
| |
| // start with duplicating. |
| void set_app_duplicating() |
| { |
| _server->stop(false); |
| dsn::replication::destroy_replica(_replica); |
| |
| dsn::app_info app_info; |
| app_info.app_type = "pegasus"; |
| app_info.duplicating = true; |
| _replica = |
| dsn::replication::create_test_replica(_replica_stub, _gpid, app_info, "./", false); |
| _server = dsn::make_unique<pegasus_server_impl>(_replica); |
| |
| SetUp(); |
| } |
| |
| int db_get(dsn::string_view raw_key, db_get_context *get_ctx) |
| { |
| return _write_impl->db_get(raw_key, get_ctx); |
| } |
| |
| void single_set(dsn::blob raw_key, dsn::blob user_value) |
| { |
| dsn::apps::update_request put; |
| put.key = raw_key; |
| put.value = user_value; |
| db_write_context write_ctx; |
| dsn::apps::update_response put_resp; |
| _write_impl->batch_put(write_ctx, put, put_resp); |
| ASSERT_EQ(_write_impl->batch_commit(0), 0); |
| } |
| }; |
| |
| TEST_F(pegasus_write_service_impl_test, put_verify_timetag) |
| { |
| set_app_duplicating(); |
| |
| dsn::blob raw_key; |
| pegasus::pegasus_generate_key( |
| raw_key, dsn::string_view("hash_key"), dsn::string_view("sort_key")); |
| std::string value = "value"; |
| int64_t decree = 10; |
| |
| /// insert timestamp 10 |
| uint64_t timestamp = 10; |
| auto ctx = db_write_context::create(decree, timestamp); |
| ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); |
| ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); |
| _write_impl->clear_up_batch_states(decree, 0); |
| ASSERT_EQ(read_timestamp_from(raw_key), timestamp); |
| |
| /// insert timestamp 15, which overwrites the previous record |
| timestamp = 15; |
| ctx = db_write_context::create(decree, timestamp); |
| ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); |
| ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); |
| _write_impl->clear_up_batch_states(decree, 0); |
| ASSERT_EQ(read_timestamp_from(raw_key), timestamp); |
| |
| /// insert timestamp 15 from remote, which will overwrite the previous record, |
| /// since its cluster id is larger (current cluster_id=1) |
| timestamp = 15; |
| ctx.remote_timetag = pegasus::generate_timetag(timestamp, 2, false); |
| ctx.verify_timetag = true; |
| ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value + "_new", 0)); |
| ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); |
| _write_impl->clear_up_batch_states(decree, 0); |
| ASSERT_EQ(read_timestamp_from(raw_key), timestamp); |
| std::string raw_value; |
| dsn::blob user_value; |
| rocksdb::Status s = |
| _write_impl->_db->Get(_write_impl->_rd_opts, utils::to_rocksdb_slice(raw_key), &raw_value); |
| pegasus_extract_user_data(_write_impl->_pegasus_data_version, std::move(raw_value), user_value); |
| ASSERT_EQ(user_value.to_string(), "value_new"); |
| |
| // write retry |
| ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value + "_new", 0)); |
| ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); |
| _write_impl->clear_up_batch_states(decree, 0); |
| |
| /// insert timestamp 16 from local, which will overwrite the remote record, |
| /// since its timestamp is larger |
| timestamp = 16; |
| ctx = db_write_context::create(decree, timestamp); |
| ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); |
| ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); |
| _write_impl->clear_up_batch_states(decree, 0); |
| ASSERT_EQ(read_timestamp_from(raw_key), timestamp); |
| |
| // write retry |
| ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); |
| ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); |
| _write_impl->clear_up_batch_states(decree, 0); |
| } |
| |
| // verify timetag on data version v0 |
| TEST_F(pegasus_write_service_impl_test, verify_timetag_compatible_with_version_0) |
| { |
| dsn::fail::setup(); |
| dsn::fail::cfg("db_get", "100%1*return()"); |
| // if db_write_batch_put_ctx invokes db_get, this test must fail. |
| |
| const_cast<uint32_t &>(_write_impl->_pegasus_data_version) = 0; // old version |
| |
| dsn::blob raw_key; |
| pegasus::pegasus_generate_key( |
| raw_key, dsn::string_view("hash_key"), dsn::string_view("sort_key")); |
| std::string value = "value"; |
| int64_t decree = 10; |
| uint64_t timestamp = 10; |
| |
| auto ctx = db_write_context::create_duplicate(decree, timestamp, true); |
| ASSERT_EQ(0, _write_impl->db_write_batch_put_ctx(ctx, raw_key, value, 0)); |
| ASSERT_EQ(0, _write_impl->db_write(ctx.decree)); |
| _write_impl->clear_up_batch_states(decree, 0); |
| |
| dsn::fail::teardown(); |
| } |
| |
| class incr_test : public pegasus_write_service_impl_test |
| { |
| public: |
| void SetUp() override |
| { |
| pegasus_write_service_impl_test::SetUp(); |
| pegasus::pegasus_generate_key( |
| req.key, dsn::string_view("hash_key"), dsn::string_view("sort_key")); |
| } |
| |
| dsn::apps::incr_request req; |
| dsn::apps::incr_response resp; |
| }; |
| |
| TEST_F(incr_test, incr_on_absent_record) |
| { |
| // ensure key is absent |
| db_get_context get_ctx; |
| db_get(req.key, &get_ctx); |
| ASSERT_FALSE(get_ctx.found); |
| |
| req.increment = 100; |
| _write_impl->incr(0, req, resp); |
| ASSERT_EQ(resp.new_value, 100); |
| |
| db_get(req.key, &get_ctx); |
| ASSERT_TRUE(get_ctx.found); |
| } |
| |
| TEST_F(incr_test, negative_incr_and_zero_incr) |
| { |
| req.increment = -100; |
| ASSERT_EQ(0, _write_impl->incr(0, req, resp)); |
| ASSERT_EQ(resp.new_value, -100); |
| |
| req.increment = -1; |
| ASSERT_EQ(0, _write_impl->incr(0, req, resp)); |
| ASSERT_EQ(resp.new_value, -101); |
| |
| req.increment = 0; |
| ASSERT_EQ(0, _write_impl->incr(0, req, resp)); |
| ASSERT_EQ(resp.new_value, -101); |
| } |
| |
| TEST_F(incr_test, invalid_incr) |
| { |
| single_set(req.key, dsn::blob::create_from_bytes("abc")); |
| |
| req.increment = 10; |
| _write_impl->incr(1, req, resp); |
| ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument); |
| ASSERT_EQ(resp.new_value, 0); |
| |
| single_set(req.key, dsn::blob::create_from_bytes("100")); |
| |
| req.increment = std::numeric_limits<int64_t>::max(); |
| _write_impl->incr(1, req, resp); |
| ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument); |
| ASSERT_EQ(resp.new_value, 100); |
| } |
| |
| } // namespace server |
| } // namespace pegasus |