blob: 4645e781f4e69db91d7921255a4535cf81f12404 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#pragma once
#include <iostream>
#include "client/partition_resolver.h"
#include "duplication_internal_types.h"
#include "rrdb.code.definition.h"
#include "rrdb_types.h"
#include "runtime/rpc/rpc_holder.h"
#include "runtime/task/task_tracker.h"
#include "utils/optional.h"
namespace dsn {
namespace apps {
typedef rpc_holder<duplicate_request, duplicate_response> duplicate_rpc;
class rrdb_client
{
public:
rrdb_client() {}
explicit rrdb_client(const char *cluster_name,
const std::vector<dsn::host_port> &meta_list,
const char *app_name)
{
_resolver =
dsn::replication::partition_resolver::get_resolver(cluster_name, meta_list, app_name);
}
~rrdb_client() { _tracker.cancel_outstanding_tasks(); }
// ---------- call RPC_RRDB_RRDB_PUT ------------
// - synchronous
std::pair<::dsn::error_code, update_response>
put_sync(const update_request &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<update_response>(_resolver->call_op(
RPC_RRDB_RRDB_PUT, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack update_request and update_response
template <typename TCallback>
::dsn::task_ptr put(const update_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_PUT,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_MULTI_PUT ------------
// - synchronous
std::pair<::dsn::error_code, update_response> multi_put_sync(const multi_put_request &args,
std::chrono::milliseconds timeout,
uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<update_response>(_resolver->call_op(
RPC_RRDB_RRDB_MULTI_PUT, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack multi_put_request and update_response
template <typename TCallback>
::dsn::task_ptr multi_put(const multi_put_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_MULTI_PUT,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_REMOVE ------------
// - synchronous
std::pair<::dsn::error_code, update_response>
remove_sync(const ::dsn::blob &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<update_response>(_resolver->call_op(
RPC_RRDB_RRDB_REMOVE, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack ::dsn::blob and update_response
template <typename TCallback>
::dsn::task_ptr remove(const ::dsn::blob &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_REMOVE,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_MULTI_REMOVE ------------
// - synchronous
std::pair<::dsn::error_code, multi_remove_response>
multi_remove_sync(const multi_remove_request &args,
std::chrono::milliseconds timeout,
uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<multi_remove_response>(
_resolver->call_op(RPC_RRDB_RRDB_MULTI_REMOVE,
args,
&_tracker,
empty_rpc_handler,
timeout,
partition_hash));
}
// - asynchronous with on-stack multi_remove_request and multi_remove_response
template <typename TCallback>
::dsn::task_ptr multi_remove(const multi_remove_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_MULTI_REMOVE,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_INCR ------------
// - synchronous
std::pair<::dsn::error_code, incr_response>
incr_sync(const incr_request &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<incr_response>(_resolver->call_op(
RPC_RRDB_RRDB_INCR, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack incr_request and incr_response
template <typename TCallback>
::dsn::task_ptr incr(const incr_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_INCR,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_CHECK_AND_SET ------------
// - synchronous
std::pair<::dsn::error_code, check_and_set_response>
check_and_set_sync(const check_and_set_request &args,
std::chrono::milliseconds timeout,
uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<check_and_set_response>(
_resolver->call_op(RPC_RRDB_RRDB_CHECK_AND_SET,
args,
&_tracker,
empty_rpc_handler,
timeout,
partition_hash));
}
// - asynchronous with on-stack check_and_set_request and check_and_set_response
template <typename TCallback>
::dsn::task_ptr check_and_set(const check_and_set_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_CHECK_AND_SET,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_CHECK_AND_MUTATE ------------
// - synchronous
std::pair<::dsn::error_code, check_and_mutate_response>
check_and_mutate_sync(const check_and_mutate_request &args,
std::chrono::milliseconds timeout,
uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<check_and_mutate_response>(
_resolver->call_op(RPC_RRDB_RRDB_CHECK_AND_MUTATE,
args,
&_tracker,
empty_rpc_handler,
timeout,
partition_hash));
}
// - asynchronous with on-stack check_and_mutate_request and check_and_mutate_response
template <typename TCallback>
::dsn::task_ptr check_and_mutate(const check_and_mutate_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_CHECK_AND_MUTATE,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_GET ------------
// - synchronous
std::pair<::dsn::error_code, read_response>
get_sync(const ::dsn::blob &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<read_response>(_resolver->call_op(
RPC_RRDB_RRDB_GET, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack ::dsn::blob and read_response
template <typename TCallback>
::dsn::task_ptr get(const ::dsn::blob &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_GET,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_MULTI_GET ------------
// - synchronous
std::pair<::dsn::error_code, multi_get_response> multi_get_sync(
const multi_get_request &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<multi_get_response>(_resolver->call_op(
RPC_RRDB_RRDB_MULTI_GET, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack multi_get_request and multi_get_response
template <typename TCallback>
::dsn::task_ptr multi_get(const multi_get_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_MULTI_GET,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_BATCH_GET ------------
// - synchronous
std::pair<::dsn::error_code, batch_get_response> batch_get_sync(
const batch_get_request &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<batch_get_response>(_resolver->call_op(
RPC_RRDB_RRDB_BATCH_GET, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack BatchGetRequest and BatchGetResponse
template <typename TCallback>
::dsn::task_ptr batch_get(const batch_get_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_BATCH_GET,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_SORTKEY_COUNT ------------
// - synchronous
std::pair<::dsn::error_code, count_response> sortkey_count_sync(
const ::dsn::blob &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<count_response>(
_resolver->call_op(RPC_RRDB_RRDB_SORTKEY_COUNT,
args,
&_tracker,
empty_rpc_handler,
timeout,
partition_hash));
}
// - asynchronous with on-stack ::dsn::blob and count_response
template <typename TCallback>
::dsn::task_ptr sortkey_count(const ::dsn::blob &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_SORTKEY_COUNT,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_TTL ------------
// - synchronous
std::pair<::dsn::error_code, ttl_response>
ttl_sync(const ::dsn::blob &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<ttl_response>(_resolver->call_op(
RPC_RRDB_RRDB_TTL, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack ::dsn::blob and ttl_response
template <typename TCallback>
::dsn::task_ptr ttl(const ::dsn::blob &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_TTL,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_GET_SCANNER ------------
// - synchronous
std::pair<::dsn::error_code, scan_response>
get_scanner_sync(const get_scanner_request &args,
std::chrono::milliseconds timeout,
uint64_t partition_hash,
dsn::optional<::dsn::rpc_address> server_addr = dsn::none)
{
return ::dsn::rpc::wait_and_unwrap<scan_response>(
_resolver->call_op(RPC_RRDB_RRDB_GET_SCANNER,
args,
&_tracker,
empty_rpc_handler,
timeout,
partition_hash));
}
// - asynchronous with on-stack get_scanner_request and scan_response
template <typename TCallback>
::dsn::task_ptr get_scanner(const get_scanner_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_GET_SCANNER,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_SCAN ------------
// - synchronous
std::pair<::dsn::error_code, scan_response>
scan_sync(const scan_request &args, std::chrono::milliseconds timeout, uint64_t partition_hash)
{
return ::dsn::rpc::wait_and_unwrap<scan_response>(_resolver->call_op(
RPC_RRDB_RRDB_SCAN, args, &_tracker, empty_rpc_handler, timeout, partition_hash));
}
// - asynchronous with on-stack scan_request and scan_response
template <typename TCallback>
::dsn::task_ptr scan(const scan_request &args,
TCallback &&callback,
std::chrono::milliseconds timeout,
uint64_t request_partition_hash,
int reply_thread_hash = 0)
{
return _resolver->call_op(RPC_RRDB_RRDB_SCAN,
args,
&_tracker,
std::forward<TCallback>(callback),
timeout,
request_partition_hash,
reply_thread_hash);
}
// ---------- call RPC_RRDB_RRDB_CLEAR_SCANNER ------------
void clear_scanner(const int64_t &args, uint64_t partition_hash)
{
_resolver->call_op(RPC_RRDB_RRDB_CLEAR_SCANNER,
args,
nullptr,
empty_rpc_handler,
std::chrono::milliseconds(0),
partition_hash);
}
// ---------- call RPC_RRDB_RRDB_DUPLICATE ------------
// - asynchronous with on-stack duplicate_request and duplicate_response
template <typename TCallback>
task_ptr duplicate(duplicate_rpc &rpc, TCallback &&callback, dsn::task_tracker *tracker)
{
return rpc.call(_resolver, tracker, std::forward<TCallback &&>(callback));
}
private:
dsn::replication::partition_resolver_ptr _resolver;
dsn::task_tracker _tracker;
};
} // namespace apps
} // namespace dsn