refactor: use db_get to implement incr (#598)

diff --git a/src/server/pegasus_read_service.h b/src/server/pegasus_read_service.h
index d38604a..67b6a80 100644
--- a/src/server/pegasus_read_service.h
+++ b/src/server/pegasus_read_service.h
@@ -1,3 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 #pragma once
 #include <iostream>
 #include <dsn/dist/replication/replication_app_base.h>
diff --git a/src/server/pegasus_write_service_impl.h b/src/server/pegasus_write_service_impl.h
index 909832b..e202ff1 100644
--- a/src/server/pegasus_write_service_impl.h
+++ b/src/server/pegasus_write_service_impl.h
@@ -177,21 +177,25 @@
         resp.decree = decree;
         resp.server = _primary_address;
 
-        rocksdb::Slice raw_key(update.key.data(), update.key.length());
-        std::string raw_value;
+        dsn::string_view raw_key(update.key.data(), update.key.length());
         int64_t new_value = 0;
         uint32_t new_expire_ts = 0;
-        rocksdb::Status s = _db->Get(_rd_opts, raw_key, &raw_value);
-        if (s.ok()) {
-            uint32_t old_expire_ts = pegasus_extract_expire_ts(_pegasus_data_version, raw_value);
-            if (check_if_ts_expired(utils::epoch_now(), old_expire_ts)) {
+        db_get_context get_ctx;
+        int err = db_get(raw_key, &get_ctx);
+        if (err == 0) {
+            if (!get_ctx.found) {
+                // old value is not found, set to 0 before increment
+                new_value = update.increment;
+                new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
+            } else if (get_ctx.expired) {
                 // ttl timeout, set to 0 before increment
                 _pfc_recent_expire_count->increment();
                 new_value = update.increment;
                 new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
             } else {
                 ::dsn::blob old_value;
-                pegasus_extract_user_data(_pegasus_data_version, std::move(raw_value), old_value);
+                pegasus_extract_user_data(
+                    _pegasus_data_version, std::move(get_ctx.raw_value), old_value);
                 if (old_value.length() == 0) {
                     // empty old value, set to 0 before increment
                     new_value = update.increment;
@@ -223,29 +227,14 @@
                     }
                 }
                 // set new ttl
-                if (update.expire_ts_seconds == 0)
-                    new_expire_ts = old_expire_ts;
-                else if (update.expire_ts_seconds < 0)
+                if (update.expire_ts_seconds == 0) {
+                    new_expire_ts = get_ctx.expire_ts;
+                } else if (update.expire_ts_seconds < 0) {
                     new_expire_ts = 0;
-                else // update.expire_ts_seconds > 0
+                } else { // update.expire_ts_seconds > 0
                     new_expire_ts = update.expire_ts_seconds;
+                }
             }
-        } else if (s.IsNotFound()) {
-            // old value is not found, set to 0 before increment
-            new_value = update.increment;
-            new_expire_ts = update.expire_ts_seconds > 0 ? update.expire_ts_seconds : 0;
-        } else {
-            // read old value failed
-            ::dsn::blob hash_key, sort_key;
-            pegasus_restore_key(::dsn::blob(raw_key.data(), 0, raw_key.size()), hash_key, sort_key);
-            derror_rocksdb("Get for Incr",
-                           s.ToString(),
-                           "decree: {}, hash_key: {}, sort_key: {}",
-                           decree,
-                           utils::c_escape_string(hash_key),
-                           utils::c_escape_string(sort_key));
-            resp.error = s.code();
-            return resp.error;
         }
 
         resp.error =
@@ -675,7 +664,10 @@
         return status.code();
     }
 
-    // The resulted `expire_ts` is -1 if record is expired.
+    /// Calls RocksDB Get and store the result into `db_get_context`.
+    /// \returns 0 if Get succeeded. On failure, a non-zero rocksdb status code is returned.
+    /// \result ctx.expired=true if record expired. Still 0 is returned.
+    /// \result ctx.found=false if record is not found. Still 0 is returned.
     int db_get(dsn::string_view raw_key,
                /*out*/ db_get_context *ctx)
     {
diff --git a/src/server/test/pegasus_write_service_impl_test.cpp b/src/server/test/pegasus_write_service_impl_test.cpp
index 7ee0a1e..7f1aec0 100644
--- a/src/server/test/pegasus_write_service_impl_test.cpp
+++ b/src/server/test/pegasus_write_service_impl_test.cpp
@@ -52,6 +52,22 @@
 
         SetUp();
     }
+
+    int db_get(dsn::string_view raw_key, db_get_context *get_ctx)
+    {
+        return _write_impl->db_get(raw_key, get_ctx);
+    }
+
+    void single_set(dsn::blob raw_key, dsn::blob user_value)
+    {
+        dsn::apps::update_request put;
+        put.key = raw_key;
+        put.value = user_value;
+        db_write_context write_ctx;
+        dsn::apps::update_response put_resp;
+        _write_impl->batch_put(write_ctx, put, put_resp);
+        ASSERT_EQ(_write_impl->batch_commit(0), 0);
+    }
 };
 
 TEST_F(pegasus_write_service_impl_test, put_verify_timetag)
@@ -140,5 +156,66 @@
     dsn::fail::teardown();
 }
 
+class incr_test : public pegasus_write_service_impl_test
+{
+public:
+    void SetUp() override
+    {
+        pegasus_write_service_impl_test::SetUp();
+        pegasus::pegasus_generate_key(
+            req.key, dsn::string_view("hash_key"), dsn::string_view("sort_key"));
+    }
+
+    dsn::apps::incr_request req;
+    dsn::apps::incr_response resp;
+};
+
+TEST_F(incr_test, incr_on_absent_record)
+{
+    // ensure key is absent
+    db_get_context get_ctx;
+    db_get(req.key, &get_ctx);
+    ASSERT_FALSE(get_ctx.found);
+
+    req.increment = 100;
+    _write_impl->incr(0, req, resp);
+    ASSERT_EQ(resp.new_value, 100);
+
+    db_get(req.key, &get_ctx);
+    ASSERT_TRUE(get_ctx.found);
+}
+
+TEST_F(incr_test, negative_incr_and_zero_incr)
+{
+    req.increment = -100;
+    ASSERT_EQ(0, _write_impl->incr(0, req, resp));
+    ASSERT_EQ(resp.new_value, -100);
+
+    req.increment = -1;
+    ASSERT_EQ(0, _write_impl->incr(0, req, resp));
+    ASSERT_EQ(resp.new_value, -101);
+
+    req.increment = 0;
+    ASSERT_EQ(0, _write_impl->incr(0, req, resp));
+    ASSERT_EQ(resp.new_value, -101);
+}
+
+TEST_F(incr_test, invalid_incr)
+{
+    single_set(req.key, dsn::blob::create_from_bytes("abc"));
+
+    req.increment = 10;
+    _write_impl->incr(1, req, resp);
+    ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
+    ASSERT_EQ(resp.new_value, 0);
+
+    single_set(req.key, dsn::blob::create_from_bytes("100"));
+
+    req.increment = std::numeric_limits<int64_t>::max();
+    _write_impl->incr(1, req, resp);
+    ASSERT_EQ(resp.error, rocksdb::Status::kInvalidArgument);
+    ASSERT_EQ(resp.new_value, 100);
+}
+
 } // namespace server
 } // namespace pegasus