blob: 928a932213d639c28c94dae41948c7eb603f8c78 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "pegasus_client_impl.h"
#include "base/pegasus_const.h"
using namespace ::dsn;
using namespace pegasus;
namespace pegasus {
namespace client {
pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
std::vector<uint64_t> &&hash,
const scan_options &options)
: pegasus_scanner_impl(client, std::move(hash), options, _min, _max)
{
_options.start_inclusive = true;
_options.stop_inclusive = false;
}
pegasus_client_impl::pegasus_scanner_impl::pegasus_scanner_impl(::dsn::apps::rrdb_client *client,
std::vector<uint64_t> &&hash,
const scan_options &options,
const ::dsn::blob &start_key,
const ::dsn::blob &stop_key)
: _client(client),
_start_key(start_key),
_stop_key(stop_key),
_options(options),
_splits_hash(std::move(hash)),
_p(-1),
_context(SCAN_CONTEXT_ID_COMPLETED),
_rpc_started(false)
{
}
int pegasus_client_impl::pegasus_scanner_impl::next(std::string &hashkey,
std::string &sortkey,
std::string &value,
internal_info *info)
{
::dsn::utils::notify_event op_completed;
int ret = -1;
auto callback = [&](
int err, std::string &&hash, std::string &&sort, std::string &&str, internal_info &&ii) {
ret = err;
hashkey = std::move(hash);
sortkey = std::move(sort);
value = std::move(str);
if (info) {
(*info) = std::move(ii);
}
op_completed.notify();
};
async_next(std::move(callback));
op_completed.wait();
return ret;
}
void pegasus_client_impl::pegasus_scanner_impl::async_next(async_scan_next_callback_t &&callback)
{
_lock.lock();
if (_queue.empty()) {
_queue.emplace_back(std::move(callback));
_async_next_internal();
// do not unlock() to ensure that other callbacks won't be executed in the this caller's
// thread
} else {
// rpc in-progress; callback will be executed when rpc finished
_queue.emplace_back(std::move(callback));
_lock.unlock();
}
}
bool pegasus_client_impl::pegasus_scanner_impl::safe_destructible() const
{
::dsn::zauto_lock l(_lock);
return _queue.empty();
}
pegasus_client::pegasus_scanner_wrapper
pegasus_client_impl::pegasus_scanner_impl::get_smart_wrapper()
{
return std::make_shared<pegasus_scanner_impl_wrapper>(this);
}
// rpc won't be executed concurrently
void pegasus_client_impl::pegasus_scanner_impl::_async_next_internal()
{
// _lock will be locked out of the while block
dassert(!_queue.empty(), "queue should not be empty when _async_next_internal start");
std::list<async_scan_next_callback_t> temp;
while (true) {
while (++_p >= _kvs.size()) {
if (_context == SCAN_CONTEXT_ID_COMPLETED) {
// reach the end of one partition
if (_splits_hash.empty()) {
// all completed
swap(_queue, temp);
_lock.unlock();
// ATTENTION: after unlock, member variables can not be used anymore
for (auto &callback : temp) {
if (callback) {
internal_info info;
info.app_id = -1;
info.partition_index = -1;
info.decree = -1;
callback(PERR_SCAN_COMPLETE,
std::string(),
std::string(),
std::string(),
std::move(info));
}
}
return;
} else {
_hash = _splits_hash.back();
_splits_hash.pop_back();
_split_reset();
}
} else if (_context == SCAN_CONTEXT_ID_NOT_EXIST) {
// no valid context_id found
_lock.unlock();
_start_scan();
return;
} else {
// valid context_id
_lock.unlock();
_next_batch();
return;
}
}
// valid data got
std::string hash_key, sort_key;
pegasus_restore_key(_kvs[_p].key, hash_key, sort_key);
std::string value(_kvs[_p].value.data(), _kvs[_p].value.length());
auto &callback = _queue.front();
if (callback) {
internal_info info(_info);
_lock.unlock();
callback(PERR_OK,
std::move(hash_key),
std::move(sort_key),
std::move(value),
std::move(info));
_lock.lock();
if (_queue.size() == 1) {
// keep the last callback until exit this function
std::swap(temp, _queue);
_lock.unlock();
return;
} else {
_queue.pop_front();
}
}
}
}
void pegasus_client_impl::pegasus_scanner_impl::_next_batch()
{
::dsn::apps::scan_request req;
req.context_id = _context;
dassert(!_rpc_started, "");
_rpc_started = true;
_client->scan(req,
[this](::dsn::error_code err,
dsn::message_ex *req,
dsn::message_ex *resp) mutable { _on_scan_response(err, req, resp); },
std::chrono::milliseconds(_options.timeout_ms),
_hash);
}
void pegasus_client_impl::pegasus_scanner_impl::_start_scan()
{
::dsn::apps::get_scanner_request req;
if (_kvs.empty()) {
req.start_key = _start_key;
req.start_inclusive = _options.start_inclusive;
} else {
req.start_key = _kvs.back().key;
req.start_inclusive = false;
}
req.stop_key = _stop_key;
req.stop_inclusive = _options.stop_inclusive;
req.batch_size = _options.batch_size;
req.hash_key_filter_type = (dsn::apps::filter_type::type)_options.hash_key_filter_type;
req.hash_key_filter_pattern = ::dsn::blob(
_options.hash_key_filter_pattern.data(), 0, _options.hash_key_filter_pattern.size());
req.sort_key_filter_type = (dsn::apps::filter_type::type)_options.sort_key_filter_type;
req.sort_key_filter_pattern = ::dsn::blob(
_options.sort_key_filter_pattern.data(), 0, _options.sort_key_filter_pattern.size());
req.no_value = _options.no_value;
dassert(!_rpc_started, "");
_rpc_started = true;
_client->get_scanner(
req,
[this](::dsn::error_code err, dsn::message_ex *req, dsn::message_ex *resp) mutable {
_on_scan_response(err, req, resp);
},
std::chrono::milliseconds(_options.timeout_ms),
_hash);
}
void pegasus_client_impl::pegasus_scanner_impl::_on_scan_response(::dsn::error_code err,
dsn::message_ex *req,
dsn::message_ex *resp)
{
dassert(_rpc_started, "");
_rpc_started = false;
::dsn::apps::scan_response response;
if (err == ERR_OK) {
::dsn::unmarshall(resp, response);
_info.app_id = response.app_id;
_info.partition_index = response.partition_index;
_info.decree = -1;
_info.server = response.server;
if (response.error == 0) {
_lock.lock();
_kvs = std::move(response.kvs);
_p = -1;
_context = response.context_id;
_async_next_internal();
return;
} else if (get_rocksdb_server_error(response.error) == PERR_NOT_FOUND) {
_lock.lock();
_context = SCAN_CONTEXT_ID_NOT_EXIST;
_async_next_internal();
return;
}
} else {
_info.app_id = -1;
_info.partition_index = -1;
_info.decree = -1;
_info.server = "";
}
// error occured
auto ret =
get_client_error(err == ERR_OK ? get_rocksdb_server_error(response.error) : int(err));
internal_info info = _info;
std::list<async_scan_next_callback_t> temp;
_lock.lock();
std::swap(_queue, temp);
_lock.unlock();
// ATTENTION: after unlock with empty queue, memebers variables can not be used anymore
for (auto &callback : temp) {
if (callback) {
callback(ret, std::string(), std::string(), std::string(), internal_info(info));
}
}
}
void pegasus_client_impl::pegasus_scanner_impl::_split_reset()
{
_kvs.clear();
_p = -1;
_context = SCAN_CONTEXT_ID_NOT_EXIST;
}
pegasus_client_impl::pegasus_scanner_impl::~pegasus_scanner_impl()
{
dsn::zauto_lock l(_lock);
dassert(!_rpc_started, "all scan-rpc should be completed here");
dassert(_queue.empty(), "queue should be empty");
if (_client) {
if (_context >= SCAN_CONTEXT_ID_VALID_MIN)
_client->clear_scanner(_context, _hash);
_client = nullptr;
}
}
void pegasus_client_impl::pegasus_scanner_impl_wrapper::async_next(
async_scan_next_callback_t &&callback)
{
// wrap shared_ptr _p with callback
_p->async_next([ __p = _p, user_callback = std::move(callback) ](int error_code,
std::string &&hash_key,
std::string &&sort_key,
std::string &&value,
internal_info &&info) {
user_callback(error_code,
std::move(hash_key),
std::move(sort_key),
std::move(value),
std::move(info));
});
}
const char pegasus_client_impl::pegasus_scanner_impl::_holder[] = {'\x00', '\x00', '\xFF', '\xFF'};
const ::dsn::blob pegasus_client_impl::pegasus_scanner_impl::_min = ::dsn::blob(_holder, 0, 2);
const ::dsn::blob pegasus_client_impl::pegasus_scanner_impl::_max = ::dsn::blob(_holder, 2, 2);
} // namespace client
} // namespace pegasus