| /* |
| * The MIT License (MIT) |
| * |
| * Copyright (c) 2015 Microsoft Corporation |
| * |
| * -=- Robust Distributed System Nucleus (rDSN) -=- |
| * |
| * Permission is hereby granted, free of charge, to any person obtaining a copy |
| * of this software and associated documentation files (the "Software"), to deal |
| * in the Software without restriction, including without limitation the rights |
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
| * copies of the Software, and to permit persons to whom the Software is |
| * furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be included in |
| * all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
| * THE SOFTWARE. |
| */ |
| |
| #include "nfs/nfs_server_impl.h" |
| |
| // IWYU pragma: no_include <ext/alloc_traits.h> |
| #include <chrono> |
| #include <cstdint> |
| #include <mutex> |
| #include <type_traits> |
| #include <vector> |
| |
| #include "absl/strings/string_view.h" |
| #include "nfs/nfs_code_definition.h" |
| #include "runtime/api_layer1.h" |
| #include "runtime/task/async_calls.h" |
| #include "utils/TokenBucket.h" |
| #include "utils/autoref_ptr.h" |
| #include "utils/env.h" |
| #include "utils/filesystem.h" |
| #include "utils/flags.h" |
| #include "utils/utils.h" |
| |
| METRIC_DEFINE_counter( |
| server, |
| nfs_server_copy_bytes, |
| dsn::metric_unit::kBytes, |
| "The accumulated data size in bytes that are read from local file in server during nfs copy"); |
| |
| METRIC_DEFINE_counter( |
| server, |
| nfs_server_copy_failed_requests, |
| dsn::metric_unit::kRequests, |
| "The number of nfs copy requests (received by server) that fail to read local file in server"); |
| |
| static const char *kMaxSendRateMegaBytesPerDiskDesc = |
| "The maximum bandwidth (MB/s) of reading data per local disk " |
| "when transferring data to remote node, 0 means no limit"; |
| DSN_DEFINE_int64(nfs, max_send_rate_megabytes_per_disk, 0, kMaxSendRateMegaBytesPerDiskDesc); |
| DSN_TAG_VARIABLE(max_send_rate_megabytes_per_disk, FT_MUTABLE); |
| |
| DSN_DECLARE_int32(file_close_timer_interval_ms_on_server); |
| DSN_DECLARE_int32(file_close_expire_time_ms); |
| |
| namespace dsn { |
| class disk_file; |
| |
| namespace service { |
| |
| nfs_service_impl::nfs_service_impl() |
| : ::dsn::serverlet<nfs_service_impl>("nfs"), |
| METRIC_VAR_INIT_server(nfs_server_copy_bytes), |
| METRIC_VAR_INIT_server(nfs_server_copy_failed_requests) |
| { |
| _file_close_timer = ::dsn::tasking::enqueue_timer( |
| LPC_NFS_FILE_CLOSE_TIMER, |
| &_tracker, |
| [this] { close_file(); }, |
| std::chrono::milliseconds(FLAGS_file_close_timer_interval_ms_on_server)); |
| |
| _send_token_buckets = std::make_unique<dsn::utils::token_buckets>(); |
| register_cli_commands(); |
| } |
| |
| void nfs_service_impl::on_copy(const ::dsn::service::copy_request &request, |
| ::dsn::rpc_replier<::dsn::service::copy_response> &reply) |
| { |
| std::string file_path = |
| dsn::utils::filesystem::path_combine(request.source_dir, request.file_name); |
| disk_file *dfile = nullptr; |
| |
| do { |
| zauto_lock l(_handles_map_lock); |
| auto it = _handles_map.find(file_path); // find file handle cache first |
| if (it == _handles_map.end()) { |
| dfile = file::open(file_path, file::FileOpenType::kReadOnly); |
| if (dfile == nullptr) { |
| LOG_ERROR("[nfs_service] open file {} failed", file_path); |
| ::dsn::service::copy_response resp; |
| resp.error = ERR_OBJECT_NOT_FOUND; |
| reply(resp); |
| return; |
| } |
| |
| auto fh = std::make_shared<file_handle_info_on_server>(); |
| fh->file_handle = dfile; |
| it = _handles_map.insert(std::make_pair(file_path, std::move(fh))).first; |
| } |
| dfile = it->second->file_handle; |
| it->second->file_access_count++; |
| it->second->last_access_time = dsn_now_ms(); |
| } while (false); |
| |
| CHECK_NOTNULL(dfile, ""); |
| LOG_DEBUG("nfs: copy from file {} [{}, {}]", |
| file_path, |
| request.offset, |
| request.offset + request.size); |
| |
| auto cp = std::make_shared<callback_para>(std::move(reply)); |
| cp->bb = blob(dsn::utils::make_shared_array<char>(request.size), request.size); |
| cp->dst_dir = request.dst_dir; |
| cp->source_disk_tag = request.source_disk_tag; |
| cp->file_path = std::move(file_path); |
| cp->offset = request.offset; |
| cp->size = request.size; |
| |
| auto buffer_save = cp->bb.buffer().get(); |
| |
| file::read( |
| dfile, |
| buffer_save, |
| request.size, |
| request.offset, |
| LPC_NFS_READ, |
| &_tracker, |
| [this, cp](error_code err, size_t sz) mutable { internal_read_callback(err, sz, *cp); }); |
| } |
| |
| void nfs_service_impl::internal_read_callback(error_code err, size_t sz, callback_para &cp) |
| { |
| if (FLAGS_max_send_rate_megabytes_per_disk > 0) { |
| _send_token_buckets->get_token_bucket(cp.source_disk_tag) |
| ->consumeWithBorrowAndWait(sz, |
| FLAGS_max_send_rate_megabytes_per_disk << 20, |
| 1.5 * (FLAGS_max_send_rate_megabytes_per_disk << 20)); |
| } |
| |
| { |
| zauto_lock l(_handles_map_lock); |
| auto it = _handles_map.find(cp.file_path); |
| |
| if (it != _handles_map.end()) { |
| it->second->file_access_count--; |
| } |
| } |
| |
| if (err != ERR_OK) { |
| LOG_ERROR("[nfs_service] read file {} failed, err = {}", cp.file_path, err); |
| METRIC_VAR_INCREMENT(nfs_server_copy_failed_requests); |
| } else { |
| METRIC_VAR_INCREMENT_BY(nfs_server_copy_bytes, sz); |
| } |
| |
| ::dsn::service::copy_response resp; |
| resp.error = err; |
| resp.file_content = std::move(cp.bb); |
| resp.offset = cp.offset; |
| resp.size = cp.size; |
| |
| cp.replier(resp); |
| } |
| |
| // RPC_NFS_NEW_NFS_GET_FILE_SIZE |
| void nfs_service_impl::on_get_file_size( |
| const ::dsn::service::get_file_size_request &request, |
| ::dsn::rpc_replier<::dsn::service::get_file_size_response> &reply) |
| { |
| get_file_size_response resp; |
| error_code err = ERR_OK; |
| std::string folder = request.source_dir; |
| // TODO(yingchun): refactor the following code! |
| if (request.file_list.size() == 0) // return all file size in the destination file folder |
| { |
| if (!dsn::utils::filesystem::directory_exists(folder)) { |
| LOG_ERROR("[nfs_service] directory {} not exist", folder); |
| err = ERR_OBJECT_NOT_FOUND; |
| } else { |
| std::vector<std::string> file_list; |
| if (!dsn::utils::filesystem::get_subfiles(folder, file_list, true)) { |
| LOG_ERROR("[nfs_service] get subfiles of directory {} failed", folder); |
| err = ERR_FILE_OPERATION_FAILED; |
| } else { |
| for (const auto &fpath : file_list) { |
| int64_t sz; |
| // TODO(yingchun): check if there are any files that are not sensitive (not |
| // encrypted). |
| if (!dsn::utils::filesystem::file_size( |
| fpath, dsn::utils::FileDataType::kSensitive, sz)) { |
| LOG_ERROR("[nfs_service] get size of file {} failed", fpath); |
| err = ERR_FILE_OPERATION_FAILED; |
| break; |
| } |
| |
| resp.size_list.push_back(sz); |
| resp.file_list.push_back( |
| fpath.substr(request.source_dir.length(), fpath.length() - 1)); |
| } |
| } |
| } |
| } else // return file size in the request file folder |
| { |
| for (const auto &file_name : request.file_list) { |
| std::string file_path = dsn::utils::filesystem::path_combine(folder, file_name); |
| int64_t sz; |
| // TODO(yingchun): check if there are any files that are not sensitive (not encrypted). |
| if (!dsn::utils::filesystem::file_size( |
| file_path, dsn::utils::FileDataType::kSensitive, sz)) { |
| LOG_ERROR("[nfs_service] get size of file {} failed", file_path); |
| err = ERR_FILE_OPERATION_FAILED; |
| break; |
| } |
| |
| resp.size_list.push_back(sz); |
| resp.file_list.push_back( |
| (folder + file_name) |
| .substr(request.source_dir.length(), (folder + file_name).length() - 1)); |
| } |
| } |
| |
| resp.error = err; |
| reply(resp); |
| } |
| |
| void nfs_service_impl::close_file() // release out-of-date file handle |
| { |
| zauto_lock l(_handles_map_lock); |
| |
| for (auto it = _handles_map.begin(); it != _handles_map.end();) { |
| auto fptr = it->second; |
| |
| // not used and expired |
| if (fptr->file_access_count == 0 && |
| dsn_now_ms() - fptr->last_access_time > (uint64_t)FLAGS_file_close_expire_time_ms) { |
| LOG_DEBUG("nfs: close file handle {}", it->first); |
| it = _handles_map.erase(it); |
| } else { |
| it++; |
| } |
| } |
| } |
| |
| // TODO(jiashuo1): just for compatibility with scripts, such as |
| // https://github.com/apache/incubator-pegasus/blob/v2.3/scripts/pegasus_offline_node_list.sh |
| void nfs_service_impl::register_cli_commands() |
| { |
| static std::once_flag flag; |
| std::call_once(flag, [&]() { |
| _nfs_max_send_rate_megabytes_cmd = dsn::command_manager::instance().register_int_command( |
| FLAGS_max_send_rate_megabytes_per_disk, |
| FLAGS_max_send_rate_megabytes_per_disk, |
| "nfs.max_send_rate_megabytes_per_disk", |
| kMaxSendRateMegaBytesPerDiskDesc); |
| }); |
| } |
| |
| } // namespace service |
| } // namespace dsn |