refactor: use db_get to implement incr (#598)
diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h
index d38604a..67b6a80 100644
--- a/src/server/pegasus_read_service.h
+++ b/src/server/pegasus_read_service.h
@@ -1,3 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
#pragma once
#include <iostream>
#include <dsn/dist/replication/replication_app_base.h>
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index 909832b..e202ff1 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -177,21 +177,25 @@
resp.decree = decree;
resp.server = _primary_address;
- rocksdb::Slice raw_key(update.key.data(), update.key.length());
- std::string raw_value;
+ dsn::string_view raw_key(update.key.data(), update.key.length());
int64_t new_value = 0;
uint32_t new_expire_ts = 0;
- rocksdb::Status s = _db->Get(_rd_opts, raw_key, &raw_value);
- if (s.ok()) {
- uint32_t old_expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, raw_value);
- if (check_if_ts_expired(utils::epoch_now(), old_expire_ts)) {
+ db_get_context get_ctx;
+ int err = db_get(raw_key, &get_ctx);
+ if (err == 0) {
+ if (!get_ctx.found) {
+ // old value is not found, set to 0 before increment
+ new_value = update.increment;
+ new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
+ } else if (get_ctx.expired) {
// ttl timeout, set to 0 before increment
_pfc_recent_expire_count->increment();
new_value = update.increment;
new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
} else {
::dsn::blob old_value;
- pegasus_extract_user_data(_pegasus_data_version, std::move(raw_value), old_value);
+ pegasus_extract_user_data(
+ _pegasus_data_version, std::move(get_ctx.raw_value), old_value);
if (old_value.length() == 0) {
// empty old value, set to 0 before increment
new_value = update.increment;
@@ -223,29 +227,14 @@
}
}
// set new ttl
- if (update.expire_ts_seconds == 0)
- new_expire_ts = old_expire_ts;
- else if (update.expire_ts_seconds < 0)
+ if (update.expire_ts_seconds == 0) {
+ new_expire_ts = get_ctx.expire_ts;
+ } else if (update.expire_ts_seconds < 0) {
new_expire_ts = 0;
- else // update.expire_ts_seconds > 0
+ } else { // update.expire_ts_seconds > 0
new_expire_ts = update.expire_ts_seconds;
+ }
}
- } else if (s.IsNotFound()) {
- // old value is not found, set to 0 before increment
- new_value = update.increment;
- new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
- } else {
- // read old value failed
- ::dsn::blob hash_key, sort_key;
- pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
- derror_rocksdb("Get for Incr",
- s.ToString(),
- "decree: {}, hash_key: {}, sort_key: {}",
- decree,
- utils::c_escape_string(hash_key),
- utils::c_escape_string(sort_key));
- resp.error = s.code();
- return resp.error;
}
resp.error =
@@ -675,7 +664,10 @@
return status.code();
}
- // The resulted `expire_ts` is -1 if record is expired.
+ /// Calls RocksDB Get and store the result into `db_get_context`.
+ /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
+ /// \result ctx.expired=true if record expired. Still 0 is returned.
+ /// \result ctx.found=false if record is not found. Still 0 is returned.
int db_get(dsn::string_view raw_key,
/*out*/ db_get_context *ctx)
{
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp b/src/server/test/pegasus_write_service_impl_test.cpp
index 7ee0a1e..7f1aec0 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -52,6 +52,22 @@
SetUp();
}
+
+ int db_get(dsn::string_view raw_key, db_get_context *get_ctx)
+ {
+ return _write_impl->db_get(raw_key, get_ctx);
+ }
+
+ void single_set(dsn::blob raw_key, dsn::blob user_value)
+ {
+ dsn::apps::update_request put;
+ put.key = raw_key;
+ put.value = user_value;
+ db_write_context write_ctx;
+ dsn::apps::update_response put_resp;
+ _write_impl->batch_put(write_ctx, put, put_resp);
+ ASSERT_EQ(_write_impl->batch_commit(0), 0);
+ }
};
TEST_F(pegasus_write_service_impl_test, put_verify_timetag)
@@ -140,5 +156,66 @@
dsn::fail::teardown();
}
+class incr_test : public pegasus_write_service_impl_test
+{
+public:
+ void SetUp() override
+ {
+ pegasus_write_service_impl_test::SetUp();
+ pegasus::pegasus_generate_key(
+ req.key, dsn::string_view("hash_key"), dsn::string_view("sort_key"));
+ }
+
+ dsn::apps::incr_request req;
+ dsn::apps::incr_response resp;
+};
+
+TEST_F(incr_test, incr_on_absent_record)
+{
+ // ensure key is absent
+ db_get_context get_ctx;
+ db_get(req.key, &get_ctx);
+ ASSERT_FALSE(get_ctx.found);
+
+ req.increment = 100;
+ _write_impl->incr(0, req, resp);
+ ASSERT_EQ(resp.new_value, 100);
+
+ db_get(req.key, &get_ctx);
+ ASSERT_TRUE(get_ctx.found);
+}
+
+TEST_F(incr_test, negative_incr_and_zero_incr)
+{
+ req.increment = -100;
+ ASSERT_EQ(0, _write_impl->incr(0, req, resp));
+ ASSERT_EQ(resp.new_value, -100);
+
+ req.increment = -1;
+ ASSERT_EQ(0, _write_impl->incr(0, req, resp));
+ ASSERT_EQ(resp.new_value, -101);
+
+ req.increment = 0;
+ ASSERT_EQ(0, _write_impl->incr(0, req, resp));
+ ASSERT_EQ(resp.new_value, -101);
+}
+
+TEST_F(incr_test, invalid_incr)
+{
+ single_set(req.key, dsn::blob::create_from_bytes("abc"));
+
+ req.increment = 10;
+ _write_impl->incr(1, req, resp);
+ ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
+ ASSERT_EQ(resp.new_value, 0);
+
+ single_set(req.key, dsn::blob::create_from_bytes("100"));
+
+ req.increment = std::numeric_limits<int64_t>::max();
+ _write_impl->incr(1, req, resp);
+ ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
+ ASSERT_EQ(resp.new_value, 100);
+}
+
} // namespace server
} // namespace pegasus