blob: b0d7f39b63a958e74dda3a6e49c874f1e2df264c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <cctype>
#include <algorithm>
#include <string>
#include <stdint.h>
#include <dsn/tool-api/auto_codes.h>
#include <dsn/tool-api/group_address.h>
#include <dsn/dist/replication/replication_other_types.h>
#include <dsn/cpp/serialization_helper/dsn.layer2_types.h>
#include <rrdb/rrdb.code.definition.h>
#include <pegasus/error.h>
#include "pegasus_client_impl.h"
#include "base/pegasus_const.h"
using namespace ::dsn;
namespace pegasus {
namespace client {
#define ROCSKDB_ERROR_START -1000
std::unordered_map<int, std::string> pegasus_client_impl::_client_error_to_string;
std::unordered_map<int, int> pegasus_client_impl::_server_error_to_client;
pegasus_client_impl::pegasus_client_impl(const char *cluster_name, const char *app_name)
: _cluster_name(cluster_name), _app_name(app_name)
{
std::vector<dsn::rpc_address> meta_servers;
dsn::replication::replica_helper::load_meta_servers(
meta_servers, PEGASUS_CLUSTER_SECTION_NAME.c_str(), cluster_name);
dassert(meta_servers.size() > 0, "");
_meta_server.assign_group("meta-servers");
_meta_server.group_address()->add_list(meta_servers);
_client = new ::dsn::apps::rrdb_client(cluster_name, meta_servers, app_name);
}
pegasus_client_impl::~pegasus_client_impl() { delete _client; }
const char *pegasus_client_impl::get_cluster_name() const { return _cluster_name.c_str(); }
const char *pegasus_client_impl::get_app_name() const { return _app_name.c_str(); }
int pegasus_client_impl::set(const std::string &hash_key,
const std::string &sort_key,
const std::string &value,
int timeout_milliseconds,
int ttl_seconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err, internal_info &&_info) {
ret = err;
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_set(hash_key, sort_key, value, std::move(callback), timeout_milliseconds, ttl_seconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_set(const std::string &hash_key,
const std::string &sort_key,
const std::string &value,
async_set_callback_t &&callback,
int timeout_milliseconds,
int ttl_seconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, internal_info());
return;
}
::dsn::apps::update_request req;
pegasus_generate_key(req.key, hash_key, sort_key);
req.value.assign(value.c_str(), 0, value.size());
if (ttl_seconds == 0)
req.expire_ts_seconds = 0;
else
req.expire_ts_seconds = ttl_seconds + utils::epoch_now();
auto partition_hash = pegasus_key_hash(req.key);
// wrap the user defined callback function, generate a new callback function.
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
internal_info info;
::dsn::apps::update_response response;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
auto ret = get_client_error(
(err == ::dsn::ERR_OK) ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(info));
};
_client->put(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::multi_set(const std::string &hash_key,
const std::map<std::string, std::string> &kvs,
int timeout_milliseconds,
int ttl_seconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err, internal_info &&_info) {
ret = err;
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_multi_set(hash_key, kvs, std::move(callback), timeout_milliseconds, ttl_seconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_multi_set(const std::string &hash_key,
const std::map<std::string, std::string> &kvs,
async_multi_set_callback_t &&callback,
int timeout_milliseconds,
int ttl_seconds)
{
// check params
if (hash_key.size() == 0) {
derror("invalid hash key: hash key should not be empty for multi_set");
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, internal_info());
return;
}
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, internal_info());
return;
}
if (kvs.empty()) {
derror("invalid kvs: kvs should not be empty");
if (callback != nullptr)
callback(PERR_INVALID_VALUE, internal_info());
return;
}
::dsn::apps::multi_put_request req;
req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size());
for (auto &kv : kvs) {
::dsn::apps::key_value kv_blob;
kv_blob.key = ::dsn::blob(kv.first.data(), 0, kv.first.size());
kv_blob.value = ::dsn::blob(kv.second.data(), 0, kv.second.size());
req.kvs.emplace_back(std::move(kv_blob));
}
if (ttl_seconds == 0)
req.expire_ts_seconds = 0;
else
req.expire_ts_seconds = ttl_seconds + utils::epoch_now();
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
// wrap the user-defined-callback-function, generate a new callback function.
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
internal_info info;
::dsn::apps::update_response response;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
auto ret = get_client_error(
(err == ::dsn::ERR_OK) ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(info));
};
_client->multi_put(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::get(const std::string &hash_key,
const std::string &sort_key,
std::string &value,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err, std::string &&str, internal_info &&_info) {
ret = err;
value = std::move(str);
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_get(hash_key, sort_key, std::move(callback), timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_get(const std::string &hash_key,
const std::string &sort_key,
async_get_callback_t &&callback,
int timeout_milliseconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, std::string(), internal_info());
return;
}
::dsn::blob req;
pegasus_generate_key(req, hash_key, sort_key);
auto partition_hash = pegasus_key_hash(req);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
std::string value;
internal_info info;
dsn::apps::read_response response;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
if (response.error == 0) {
value.assign(response.value.data(), response.value.length());
}
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.server = response.server;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(value), std::move(info));
};
_client->get(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::multi_get(const std::string &hash_key,
const std::set<std::string> &sort_keys,
std::map<std::string, std::string> &values,
int max_fetch_count,
int max_fetch_size,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback =
[&](int err, std::map<std::string, std::string> &&_values, internal_info &&_info) {
ret = err;
if (info != nullptr)
(*info) = std::move(_info);
values = std::move(_values);
op_completed.notify();
};
async_multi_get(hash_key,
sort_keys,
std::move(callback),
max_fetch_count,
max_fetch_size,
timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_multi_get(const std::string &hash_key,
const std::set<std::string> &sort_keys,
async_multi_get_callback_t &&callback,
int max_fetch_count,
int max_fetch_size,
int timeout_milliseconds)
{
// check params
if (hash_key.size() == 0) {
derror("invalid hash key: hash key should not be empty");
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, std::map<std::string, std::string>(), internal_info());
return;
}
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, std::map<std::string, std::string>(), internal_info());
return;
}
::dsn::apps::multi_get_request req;
req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size());
req.max_kv_count = max_fetch_count;
req.max_kv_size = max_fetch_size;
req.start_inclusive = true;
req.stop_inclusive = false;
for (auto &sort_key : sort_keys) {
req.sort_keys.emplace_back(sort_key.data(), 0, sort_key.size());
}
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
std::map<std::string, std::string> values;
internal_info info;
::dsn::apps::multi_get_response response;
if (err == ::dsn::ERR_OK) {
::unmarshall(resp, response);
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.server = response.server;
for (auto &kv : response.kvs)
values.emplace(std::string(kv.key.data(), kv.key.length()),
std::string(kv.value.data(), kv.value.length()));
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(values), std::move(info));
};
_client->multi_get(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::multi_get(const std::string &hash_key,
const std::string &start_sortkey,
const std::string &stop_sortkey,
const multi_get_options &options,
std::map<std::string, std::string> &values,
int max_fetch_count,
int max_fetch_size,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback =
[&](int err, std::map<std::string, std::string> &&_values, internal_info &&_info) {
ret = err;
if (info != nullptr)
(*info) = std::move(_info);
values = std::move(_values);
op_completed.notify();
};
async_multi_get(hash_key,
start_sortkey,
stop_sortkey,
options,
std::move(callback),
max_fetch_count,
max_fetch_size,
timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_multi_get(const std::string &hash_key,
const std::string &start_sortkey,
const std::string &stop_sortkey,
const multi_get_options &options,
async_multi_get_callback_t &&callback,
int max_fetch_count,
int max_fetch_size,
int timeout_milliseconds)
{
// check params
if (hash_key.size() == 0) {
derror("invalid hash key: hash key should not be empty");
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, std::map<std::string, std::string>(), internal_info());
return;
}
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, std::map<std::string, std::string>(), internal_info());
return;
}
::dsn::apps::multi_get_request req;
req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size());
req.start_sortkey = ::dsn::blob(start_sortkey.data(), 0, start_sortkey.size());
req.stop_sortkey = ::dsn::blob(stop_sortkey.data(), 0, stop_sortkey.size());
req.start_inclusive = options.start_inclusive;
req.stop_inclusive = options.stop_inclusive;
req.max_kv_count = max_fetch_count;
req.max_kv_size = max_fetch_size;
req.no_value = options.no_value;
req.reverse = options.reverse;
req.sort_key_filter_type = (dsn::apps::filter_type::type)options.sort_key_filter_type;
req.sort_key_filter_pattern = ::dsn::blob(
options.sort_key_filter_pattern.data(), 0, options.sort_key_filter_pattern.size());
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
std::map<std::string, std::string> values;
internal_info info;
::dsn::apps::multi_get_response response;
if (err == ::dsn::ERR_OK) {
::unmarshall(resp, response);
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.server = response.server;
for (auto &kv : response.kvs)
values.emplace(std::string(kv.key.data(), kv.key.length()),
std::string(kv.value.data(), kv.value.length()));
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(values), std::move(info));
};
_client->multi_get(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::multi_get_sortkeys(const std::string &hash_key,
std::set<std::string> &sort_keys,
int max_fetch_count,
int max_fetch_size,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err, std::set<std::string> &&_sort_keys, internal_info &&_info) {
ret = err;
if (info != nullptr)
(*info) = std::move(_info);
sort_keys = std::move(_sort_keys);
op_completed.notify();
};
async_multi_get_sortkeys(
hash_key, std::move(callback), max_fetch_count, max_fetch_size, timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_multi_get_sortkeys(const std::string &hash_key,
async_multi_get_sortkeys_callback_t &&callback,
int max_fetch_count,
int max_fetch_size,
int timeout_milliseconds)
{
// check params
if (hash_key.size() == 0) {
derror("invalid hash key: hash key should not be empty for multi_get_sortkeys");
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, std::set<std::string>(), internal_info());
return;
}
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, std::set<std::string>(), internal_info());
return;
}
::dsn::apps::multi_get_request req;
req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size());
req.max_kv_count = max_fetch_count;
req.max_kv_size = max_fetch_size;
req.no_value = true;
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
std::set<std::string> sort_keys;
internal_info info;
::dsn::apps::multi_get_response response;
if (err == ::dsn::ERR_OK) {
::unmarshall(resp, response);
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.server = response.server;
for (auto &kv : response.kvs)
sort_keys.insert(std::string(kv.key.data(), kv.key.length()));
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(sort_keys), std::move(info));
};
_client->multi_get(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::exist(const std::string &hash_key,
const std::string &sort_key,
int timeout_milliseconds,
internal_info *info)
{
int ttl_seconds;
return ttl(hash_key, sort_key, ttl_seconds, timeout_milliseconds, info);
}
int pegasus_client_impl::sortkey_count(const std::string &hash_key,
int64_t &count,
int timeout_milliseconds,
internal_info *info)
{
// check params
if (hash_key.size() == 0) {
derror("invalid hash key: hash key should not be empty for sortkey_count");
return PERR_INVALID_HASH_KEY;
}
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
return PERR_INVALID_HASH_KEY;
}
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, hash_key, std::string());
auto partition_hash = pegasus_key_hash(tmp_key);
auto pr = _client->sortkey_count_sync(::dsn::blob(hash_key.data(), 0, hash_key.length()),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
if (pr.first == ERR_OK && pr.second.error == 0) {
count = pr.second.count;
}
if (info != nullptr) {
if (pr.first == ERR_OK) {
info->app_id = pr.second.app_id;
info->partition_index = pr.second.partition_index;
info->decree = -1;
info->server = pr.second.server;
} else {
info->app_id = -1;
info->partition_index = -1;
info->decree = -1;
}
}
return get_client_error(pr.first == ERR_OK ? get_rocksdb_server_error(pr.second.error)
: int(pr.first));
}
int pegasus_client_impl::del(const std::string &hash_key,
const std::string &sort_key,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err, internal_info &&_info) {
ret = err;
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_del(hash_key, sort_key, std::move(callback), timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_del(const std::string &hash_key,
const std::string &sort_key,
async_del_callback_t &&callback,
int timeout_milliseconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, internal_info());
return;
}
::dsn::blob req;
pegasus_generate_key(req, hash_key, sort_key);
auto partition_hash = pegasus_key_hash(req);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
::dsn::apps::update_response response;
internal_info info;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(info));
};
_client->remove(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::multi_del(const std::string &hash_key,
const std::set<std::string> &sort_keys,
int64_t &deleted_count,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err, int64_t _deleted_count, internal_info &&_info) {
ret = err;
deleted_count = _deleted_count;
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_multi_del(hash_key, sort_keys, std::move(callback), timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_multi_del(const std::string &hash_key,
const std::set<std::string> &sort_keys,
async_multi_del_callback_t &&callback,
int timeout_milliseconds)
{
// check params
if (hash_key.size() == 0) {
derror("invalid hash key: hash key should not be empty for multi_del");
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, 0, internal_info());
return;
}
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, 0, internal_info());
return;
}
if (sort_keys.empty()) {
derror("invalid sort keys: should not be empty");
if (callback != nullptr)
callback(PERR_INVALID_VALUE, 0, internal_info());
return;
}
::dsn::apps::multi_remove_request req;
req.hash_key = ::dsn::blob(hash_key.data(), 0, hash_key.size());
for (auto &sort_key : sort_keys) {
req.sort_keys.emplace_back(sort_key.data(), 0, sort_key.size());
}
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
::dsn::apps::multi_remove_response response;
internal_info info;
int64_t deleted_count = 0;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
deleted_count = response.count;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, deleted_count, std::move(info));
};
_client->multi_remove(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::incr(const std::string &hash_key,
const std::string &sort_key,
int64_t increment,
int64_t &new_value,
int timeout_milliseconds,
int ttl_seconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int _err, int64_t _new_value, internal_info &&_info) {
ret = _err;
new_value = _new_value;
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_incr(
hash_key, sort_key, increment, std::move(callback), timeout_milliseconds, ttl_seconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_incr(const std::string &hash_key,
const std::string &sort_key,
int64_t increment,
async_incr_callback_t &&callback,
int timeout_milliseconds,
int ttl_seconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, 0, internal_info());
return;
}
if (ttl_seconds < -1) {
derror("invalid ttl seconds: should be no less than -1, but %d", ttl_seconds);
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, 0, internal_info());
return;
}
::dsn::apps::incr_request req;
pegasus_generate_key(req.key, hash_key, sort_key);
req.increment = increment;
if (ttl_seconds <= 0)
req.expire_ts_seconds = ttl_seconds;
else
req.expire_ts_seconds = ttl_seconds + utils::epoch_now();
auto partition_hash = pegasus_key_hash(req.key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
::dsn::apps::incr_response response;
internal_info info;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, response.new_value, std::move(info));
};
_client->incr(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::check_and_set(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const std::string &set_sort_key,
const std::string &set_value,
const check_and_set_options &options,
check_and_set_results &results,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int _err, check_and_set_results &&_results, internal_info &&_info) {
ret = _err;
results = std::move(_results);
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_check_and_set(hash_key,
check_sort_key,
check_type,
check_operand,
set_sort_key,
set_value,
options,
std::move(callback),
timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_check_and_set(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const std::string &set_sort_key,
const std::string &set_value,
const check_and_set_options &options,
async_check_and_set_callback_t &&callback,
int timeout_milliseconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, check_and_set_results(), internal_info());
return;
}
if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) ==
dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) {
derror("invalid check type: %d", (int)check_type);
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_set_results(), internal_info());
return;
}
::dsn::apps::check_and_set_request req;
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size());
req.check_type = (dsn::apps::cas_check_type::type)check_type;
req.check_operand.assign(check_operand.c_str(), 0, check_operand.size());
if (check_sort_key != set_sort_key) {
req.set_diff_sort_key = true;
req.set_sort_key.assign(set_sort_key.c_str(), 0, set_sort_key.size());
}
req.set_value.assign(set_value.c_str(), 0, set_value.size());
if (options.set_value_ttl_seconds == 0)
req.set_expire_ts_seconds = 0;
else
req.set_expire_ts_seconds = options.set_value_ttl_seconds + utils::epoch_now();
req.return_check_value = options.return_check_value;
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
check_and_set_results results;
internal_info info;
::dsn::apps::check_and_set_response response;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
if (response.error == 0) {
results.set_succeed = true;
} else if (response.error == 13) { // kTryAgain
results.set_succeed = false;
response.error = 0;
} else {
results.set_succeed = false;
}
if (response.check_value_returned) {
results.check_value_returned = true;
if (response.check_value_exist) {
results.check_value_exist = true;
results.check_value.assign(response.check_value.data(),
response.check_value.length());
}
}
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(results), std::move(info));
};
_client->check_and_set(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
check_and_mutate_results &results,
int timeout_milliseconds,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int _err, check_and_mutate_results &&_results, internal_info &&_info) {
ret = _err;
results = std::move(_results);
if (info != nullptr)
(*info) = std::move(_info);
op_completed.notify();
};
async_check_and_mutate(hash_key,
check_sort_key,
check_type,
check_operand,
mutations,
options,
std::move(callback),
timeout_milliseconds);
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_check_and_mutate(const std::string &hash_key,
const std::string &check_sort_key,
cas_check_type check_type,
const std::string &check_operand,
const mutations &mutations,
const check_and_mutate_options &options,
async_check_and_mutate_callback_t &&callback,
int timeout_milliseconds)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
if (callback != nullptr)
callback(PERR_INVALID_HASH_KEY, check_and_mutate_results(), internal_info());
return;
}
if (dsn::apps::_cas_check_type_VALUES_TO_NAMES.find(check_type) ==
dsn::apps::_cas_check_type_VALUES_TO_NAMES.end()) {
derror("invalid check type: %d", (int)check_type);
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info());
return;
}
if (mutations.is_empty()) {
derror("invalid mutations: mutations should not be empty.");
if (callback != nullptr)
callback(PERR_INVALID_ARGUMENT, check_and_mutate_results(), internal_info());
return;
}
::dsn::apps::check_and_mutate_request req;
req.hash_key.assign(hash_key.c_str(), 0, hash_key.size());
req.check_sort_key.assign(check_sort_key.c_str(), 0, check_sort_key.size());
req.check_type = (dsn::apps::cas_check_type::type)check_type;
req.check_operand.assign(check_operand.c_str(), 0, check_operand.size());
std::vector<mutate> mutate_list;
mutations.get_mutations(mutate_list);
req.mutate_list.resize(mutate_list.size());
for (int i = 0; i < mutate_list.size(); ++i) {
auto &mu = mutate_list[i];
req.mutate_list[i].operation = (dsn::apps::mutate_operation::type)mu.operation;
req.mutate_list[i].sort_key = blob::create_from_bytes(std::move(mu.sort_key));
if (mu.operation == mutate::mutate_operation::MO_PUT) {
req.mutate_list[i].value = blob::create_from_bytes(std::move(mu.value));
req.mutate_list[i].set_expire_ts_seconds = mu.set_expire_ts_seconds;
}
}
req.return_check_value = options.return_check_value;
::dsn::blob tmp_key;
pegasus_generate_key(tmp_key, req.hash_key, ::dsn::blob());
auto partition_hash = pegasus_key_hash(tmp_key);
auto new_callback = [user_callback = std::move(callback)](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
if (user_callback == nullptr) {
return;
}
check_and_mutate_results results;
internal_info info;
::dsn::apps::check_and_mutate_response response;
if (err == ::dsn::ERR_OK) {
::dsn::unmarshall(resp, response);
if (response.error == 0) {
results.mutate_succeed = true;
} else if (response.error == 13) { // kTryAgain
results.mutate_succeed = false;
response.error = 0;
} else {
results.mutate_succeed = false;
}
if (response.check_value_returned) {
results.check_value_returned = true;
if (response.check_value_exist) {
results.check_value_exist = true;
results.check_value.assign(response.check_value.data(),
response.check_value.length());
}
}
info.app_id = response.app_id;
info.partition_index = response.partition_index;
info.decree = response.decree;
info.server = response.server;
}
int ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
user_callback(ret, std::move(results), std::move(info));
};
_client->check_and_mutate(req,
std::move(new_callback),
std::chrono::milliseconds(timeout_milliseconds),
partition_hash);
}
int pegasus_client_impl::ttl(const std::string &hash_key,
const std::string &sort_key,
int &ttl_seconds,
int timeout_milliseconds,
internal_info *info)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
return PERR_INVALID_HASH_KEY;
}
::dsn::blob req;
pegasus_generate_key(req, hash_key, sort_key);
auto partition_hash = pegasus_key_hash(req);
auto pr =
_client->ttl_sync(req, std::chrono::milliseconds(timeout_milliseconds), partition_hash);
if (pr.first == ERR_OK && pr.second.error == 0) {
ttl_seconds = pr.second.ttl_seconds;
}
if (info != nullptr) {
if (pr.first == ERR_OK) {
info->app_id = pr.second.app_id;
info->partition_index = pr.second.partition_index;
info->decree = -1;
info->server = pr.second.server;
} else {
info->app_id = -1;
info->partition_index = -1;
info->decree = -1;
}
}
return get_client_error(pr.first == ERR_OK ? get_rocksdb_server_error(pr.second.error)
: int(pr.first));
}
void pegasus_client_impl::async_get_scanner(const std::string &hash_key,
const std::string &start_sortkey,
const std::string &stop_sortkey,
const scan_options &options,
async_get_scanner_callback_t &&callback)
{
if (callback) {
pegasus_scanner *scanner;
int ret = get_scanner(hash_key, start_sortkey, stop_sortkey, options, scanner);
callback(ret, scanner);
}
}
int pegasus_client_impl::get_scanner(const std::string &hash_key,
const std::string &start_sort_key,
const std::string &stop_sort_key,
const scan_options &options,
pegasus_scanner *&scanner)
{
// check params
if (hash_key.size() >= UINT16_MAX) {
derror("invalid hash key: hash key length should be less than UINT16_MAX, but %d",
(int)hash_key.size());
return PERR_INVALID_HASH_KEY;
}
if (hash_key.empty()) {
derror("invalid hash key: hash key cannot be empty when scan");
return PERR_INVALID_HASH_KEY;
}
::dsn::blob start;
::dsn::blob stop;
scan_options o(options);
// generate key range by start_sort_key and stop_sort_key
pegasus_generate_key(start, hash_key, start_sort_key);
if (stop_sort_key.empty()) {
pegasus_generate_next_blob(stop, hash_key);
o.stop_inclusive = false;
} else {
pegasus_generate_key(stop, hash_key, stop_sort_key);
}
// limit key range by prefix filter
if (o.sort_key_filter_type == filter_type::FT_MATCH_PREFIX &&
o.sort_key_filter_pattern.length() > 0) {
::dsn::blob prefix_start, prefix_stop;
pegasus_generate_key(prefix_start, hash_key, o.sort_key_filter_pattern);
pegasus_generate_next_blob(prefix_stop, hash_key, o.sort_key_filter_pattern);
if (::dsn::string_view(prefix_start).compare(start) > 0) {
start = std::move(prefix_start);
o.start_inclusive = true;
}
if (::dsn::string_view(prefix_stop).compare(stop) <= 0) {
stop = std::move(prefix_stop);
o.stop_inclusive = false;
}
}
// check if range is empty
std::vector<uint64_t> v;
int c = ::dsn::string_view(start).compare(stop);
if (c < 0 || (c == 0 && o.start_inclusive && o.stop_inclusive)) {
v.push_back(pegasus_key_hash(start));
}
scanner = new pegasus_scanner_impl(_client, std::move(v), o, start, stop, false);
return PERR_OK;
}
DEFINE_TASK_CODE_RPC(RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX,
TASK_PRIORITY_COMMON,
::dsn::THREAD_POOL_DEFAULT)
void pegasus_client_impl::async_get_unordered_scanners(
int max_split_count,
const scan_options &options,
async_get_unordered_scanners_callback_t &&callback)
{
if (!callback) {
return;
}
// check params
if (max_split_count <= 0) {
derror("invalid max_split_count: which should be greater than 0, but %d", max_split_count);
callback(PERR_INVALID_SPLIT_COUNT, std::vector<pegasus_scanner *>());
return;
}
auto new_callback = [ user_callback = std::move(callback), max_split_count, options, this ](
::dsn::error_code err, dsn::message_ex * req, dsn::message_ex * resp)
{
std::vector<pegasus_scanner *> scanners;
configuration_query_by_index_response response;
if (err == ERR_OK) {
::dsn::unmarshall(resp, response);
if (response.err == ERR_OK) {
unsigned int count = response.partition_count;
int split = count < max_split_count ? count : max_split_count;
scanners.resize(split);
int size = count / split;
int more = count - size * split;
for (int i = 0; i < split; i++) {
int s = size + (i < more);
std::vector<uint64_t> hash(s);
for (int j = 0; j < s; j++)
hash[j] = --count;
scanners[i] = new pegasus_scanner_impl(_client, std::move(hash), options, true);
}
}
}
int ret = get_client_error(err == ERR_OK ? int(response.err) : int(err));
user_callback(ret, std::move(scanners));
};
configuration_query_by_index_request req;
req.app_name = _app_name;
::dsn::rpc::call(_meta_server,
RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX,
req,
nullptr,
new_callback,
std::chrono::milliseconds(options.timeout_ms),
0,
0);
}
int pegasus_client_impl::get_unordered_scanners(int max_split_count,
const scan_options &options,
std::vector<pegasus_scanner *> &scanners)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](int err, std::vector<pegasus_scanner *> &&ss) {
ret = err;
scanners = std::move(ss);
op_completed.notify();
};
async_get_unordered_scanners(max_split_count, options, std::move(callback));
op_completed.wait();
return ret;
}
void pegasus_client_impl::async_duplicate(dsn::apps::duplicate_rpc rpc,
std::function<void(dsn::error_code)> &&callback,
dsn::task_tracker *tracker)
{
_client->duplicate(rpc, std::move(callback), tracker);
}
const char *pegasus_client_impl::get_error_string(int error_code) const
{
auto it = _client_error_to_string.find(error_code);
dassert(
it != _client_error_to_string.end(), "client error %d have no error string", error_code);
return it->second.c_str();
}
/*static*/ void pegasus_client_impl::init_error()
{
_client_error_to_string.clear();
#define PEGASUS_ERR_CODE(x, y, z) _client_error_to_string[y] = z
#include <pegasus/error_def.h>
#undef PEGASUS_ERR_CODE
_server_error_to_client.clear();
_server_error_to_client[::dsn::ERR_OK] = PERR_OK;
_server_error_to_client[::dsn::ERR_TIMEOUT] = PERR_TIMEOUT;
_server_error_to_client[::dsn::ERR_FILE_OPERATION_FAILED] = PERR_SERVER_INTERNAL_ERROR;
_server_error_to_client[::dsn::ERR_INVALID_STATE] = PERR_SERVER_CHANGED;
_server_error_to_client[::dsn::ERR_OBJECT_NOT_FOUND] = PERR_OBJECT_NOT_FOUND;
_server_error_to_client[::dsn::ERR_NETWORK_FAILURE] = PERR_NETWORK_FAILURE;
_server_error_to_client[::dsn::ERR_HANDLER_NOT_FOUND] = PERR_HANDLER_NOT_FOUND;
_server_error_to_client[::dsn::ERR_OPERATION_DISABLED] = PERR_OPERATION_DISABLED;
_server_error_to_client[::dsn::ERR_NOT_ENOUGH_MEMBER] = PERR_NOT_ENOUGH_MEMBER;
_server_error_to_client[::dsn::ERR_APP_NOT_EXIST] = PERR_APP_NOT_EXIST;
_server_error_to_client[::dsn::ERR_APP_EXIST] = PERR_APP_EXIST;
_server_error_to_client[::dsn::ERR_BUSY] = PERR_APP_BUSY;
// rocksdb error;
for (int i = 1001; i < 1013; i++) {
_server_error_to_client[-i] = -i;
}
}
/*static*/ int pegasus_client_impl::get_client_error(int server_error)
{
auto it = _server_error_to_client.find(server_error);
if (it != _server_error_to_client.end())
return it->second;
derror("can't find corresponding client error definition, server error:[%d:%s]",
server_error,
::dsn::error_code(server_error).to_string());
return PERR_UNKNOWN;
}
/*static*/ int pegasus_client_impl::get_rocksdb_server_error(int rocskdb_error)
{
return (rocskdb_error == 0) ? 0 : ROCSKDB_ERROR_START - rocskdb_error;
}
} // namespace client
} // namespace pegasus