blob: 183ac38a97cc1d1221ec4a8657045fb1907ab868 [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#pragma once
#include <stddef.h>
#include <stdint.h>
#include <atomic>
#include <chrono>
#include <deque>
#include <list>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "aio/aio_task.h"
#include "aio/file_io.h"
#include "nfs_code_definition.h"
#include "nfs_types.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/task/async_calls.h"
#include "runtime/task/task.h"
#include "runtime/task/task_tracker.h"
#include "utils/TokenBucket.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
#include "utils/metrics.h"
#include "utils/zlocks.h"
namespace dsn {
class command_deregister;
class disk_file;
namespace utils {
class token_buckets;
} // namespace utils
struct remote_copy_request;
namespace service {
using TokenBucket = folly::BasicTokenBucket<std::chrono::steady_clock>;
template <typename TCallback>
task_ptr async_nfs_get_file_size(const get_file_size_request &request,
TCallback &&callback,
std::chrono::milliseconds timeout,
rpc_address server_addr)
{
return rpc::call(server_addr,
RPC_NFS_GET_FILE_SIZE,
request,
nullptr,
std::forward<TCallback>(callback),
timeout);
}
template <typename TCallback>
task_ptr async_nfs_copy(const copy_request &request,
TCallback &&callback,
std::chrono::milliseconds timeout,
rpc_address server_addr)
{
return rpc::call(
server_addr, RPC_NFS_COPY, request, nullptr, std::forward<TCallback>(callback), timeout);
}
class nfs_client_impl
{
public:
struct copy_request_ex;
struct file_context;
struct file_wrapper;
struct user_request;
typedef ::dsn::ref_ptr<user_request> user_request_ptr;
typedef ::dsn::ref_ptr<file_context> file_context_ptr;
typedef ::dsn::ref_ptr<copy_request_ex> copy_request_ex_ptr;
typedef ::dsn::ref_ptr<file_wrapper> file_wrapper_ptr;
struct file_wrapper : public ::dsn::ref_counter
{
disk_file *file_handle;
file_wrapper() { file_handle = nullptr; }
~file_wrapper()
{
if (file_handle != nullptr) {
auto err = file::close(file_handle);
CHECK_EQ_MSG(err, ERR_OK, "file::close failed");
}
}
};
struct copy_request_ex : public ::dsn::ref_counter
{
file_context_ptr file_ctx; // reference to the owner
int index;
uint64_t offset;
uint32_t size;
bool is_last;
copy_response response;
::dsn::task_ptr remote_copy_task;
::dsn::task_ptr local_write_task;
bool is_ready_for_write;
bool is_valid;
int retry_count;
zlock lock; // to protect is_valid
copy_request_ex(const file_context_ptr &file, int idx, int try_count)
{
file_ctx = file;
index = idx;
offset = 0;
size = 0;
is_last = false;
is_ready_for_write = false;
is_valid = true;
retry_count = try_count;
}
};
struct file_context : public ::dsn::ref_counter
{
user_request_ptr user_req; // reference to the owner
std::string file_name;
uint64_t file_size;
file_wrapper_ptr file_holder;
int current_write_index;
int finished_segments;
std::vector<copy_request_ex_ptr> copy_requests;
file_context(const user_request_ptr &req, const std::string &file_nm, uint64_t sz)
{
user_req = req;
file_name = file_nm;
file_size = sz;
file_holder = new file_wrapper();
current_write_index = -1;
finished_segments = 0;
}
};
struct user_request : public ::dsn::ref_counter
{
zlock user_req_lock;
bool high_priority;
int low_queue_index;
get_file_size_request file_size_req;
::dsn::ref_ptr<aio_task> nfs_task;
std::atomic<int> finished_files;
std::atomic<int> concurrent_copy_count;
bool is_finished;
std::vector<file_context_ptr> file_contexts;
user_request()
{
high_priority = false;
low_queue_index = -1;
finished_files = 0;
concurrent_copy_count = 0;
is_finished = false;
}
};
struct random_robin_queue
{
int max_concurrent_copy_count_per_queue;
size_t total_count;
// each queue represents all requests for one user_request
std::list<std::deque<copy_request_ex_ptr>> queue_list;
// the next queue to pop request
std::list<std::deque<copy_request_ex_ptr>>::iterator pop_it;
random_robin_queue(int max_concurrent_copy_count_per_queue_)
{
max_concurrent_copy_count_per_queue = max_concurrent_copy_count_per_queue_;
total_count = 0;
pop_it = queue_list.end();
}
// push request queue as an unique sub-queue.
void push(std::deque<copy_request_ex_ptr> &&q)
{
total_count += q.size();
queue_list.emplace_back(std::move(q));
}
// push retry request to this queue.
// if the original sub-queue is exist, push to front of it,
// else push to a new sub-queue.
void push_retry(const copy_request_ex_ptr &p)
{
total_count++;
for (auto it = queue_list.begin(); it != queue_list.end(); ++it) {
if (it->front()->file_ctx->user_req.get() == p->file_ctx->user_req.get()) {
// belong the the same user_request
it->push_front(p);
return;
}
}
queue_list.emplace_back(std::deque<copy_request_ex_ptr>({p}));
}
// pop one request from this queue.
// return nullptr if no valid request found.
copy_request_ex_ptr pop()
{
copy_request_ex_ptr p;
if (total_count == 0)
return p;
if (pop_it == queue_list.end())
pop_it = queue_list.begin();
auto start_it = pop_it;
while (true) {
if (pop_it->front()->file_ctx->user_req->concurrent_copy_count <
max_concurrent_copy_count_per_queue) {
// ok, find one, pop from queue, and forward pop_it
p = pop_it->front();
pop_it->pop_front();
if (pop_it->empty()) {
pop_it = queue_list.erase(pop_it);
} else {
pop_it++;
}
total_count--;
break;
}
// forward pop_it
pop_it++;
if (pop_it == queue_list.end())
pop_it = queue_list.begin();
// iterate for a round
if (pop_it == start_it)
break;
}
return p;
}
bool empty() { return total_count == 0; }
};
public:
nfs_client_impl();
virtual ~nfs_client_impl();
// copy file request entry
void begin_remote_copy(std::shared_ptr<remote_copy_request> &rci, aio_task *nfs_task);
private:
void end_get_file_size(::dsn::error_code err,
const ::dsn::service::get_file_size_response &resp,
const user_request_ptr &ureq);
void continue_copy();
void
end_copy(::dsn::error_code err, const copy_response &resp, const copy_request_ex_ptr &reqc);
void continue_write();
void end_write(error_code err, size_t sz, const copy_request_ex_ptr &reqc);
void handle_completion(const user_request_ptr &req, error_code err);
void register_cli_commands();
private:
std::unique_ptr<dsn::utils::token_buckets>
_copy_token_buckets; // rate limiter of copy from remote
std::atomic<int> _concurrent_copy_request_count; // record concurrent request count, limited
// by max_concurrent_remote_copy_requests.
std::atomic<int> _concurrent_local_write_count; // record concurrent write count, limited
// by max_concurrent_local_writes.
std::atomic<int> _buffered_local_write_count; // record current buffered write count, limited
// by max_buffered_local_writes.
zlock _copy_requests_lock;
std::deque<copy_request_ex_ptr> _copy_requests_high;
random_robin_queue _copy_requests_low;
int _high_priority_remaining_time;
zlock _local_writes_lock;
std::deque<copy_request_ex_ptr> _local_writes;
METRIC_VAR_DECLARE_counter(nfs_client_copy_bytes);
METRIC_VAR_DECLARE_counter(nfs_client_copy_failed_requests);
METRIC_VAR_DECLARE_counter(nfs_client_write_bytes);
METRIC_VAR_DECLARE_counter(nfs_client_failed_writes);
std::unique_ptr<command_deregister> _nfs_max_copy_rate_megabytes_cmd;
dsn::task_tracker _tracker;
};
} // namespace service
} // namespace dsn