| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "io/fs/local_file_reader.h" |
| |
| #include <bthread/bthread.h> |
| // IWYU pragma: no_include <bthread/errno.h> |
| #include <bvar/bvar.h> |
| #include <errno.h> // IWYU pragma: keep |
| #include <fmt/format.h> |
| #include <glog/logging.h> |
| #include <unistd.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <cstring> |
| #include <string> |
| #include <utility> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "cpp/sync_point.h" |
| #include "io/fs/err_utils.h" |
| #include "olap/data_dir.h" |
| #include "olap/olap_common.h" |
| #include "olap/options.h" |
| #include "runtime/thread_context.h" |
| #include "runtime/workload_management/io_throttle.h" |
| #include "util/async_io.h" |
| #include "util/debug_points.h" |
| #include "util/defer_op.h" |
| #include "util/doris_metrics.h" |
| |
| namespace doris { |
| namespace io { |
| // 1: initing 2: inited 0: before init |
| std::atomic_int BeConfDataDirReader::be_config_data_dir_list_state = 0; |
| |
| std::vector<doris::DataDirInfo> BeConfDataDirReader::be_config_data_dir_list; |
| |
| void BeConfDataDirReader::get_data_dir_by_file_path(io::Path* file_path, |
| std::string* data_dir_arg) { |
| int state = be_config_data_dir_list_state.load(std::memory_order_acquire); |
| if (state == 0) [[unlikely]] { |
| return; |
| } else if (state == 1) [[unlikely]] { |
| be_config_data_dir_list_state.wait(1); |
| } |
| |
| for (const auto& data_dir_info : be_config_data_dir_list) { |
| if (data_dir_info.path.size() >= file_path->string().size()) { |
| continue; |
| } |
| if (file_path->string().compare(0, data_dir_info.path.size(), data_dir_info.path) == 0) { |
| *data_dir_arg = data_dir_info.path; |
| break; |
| } |
| } |
| } |
| |
| void BeConfDataDirReader::init_be_conf_data_dir( |
| const std::vector<doris::StorePath>& store_paths, |
| const std::vector<doris::StorePath>& spill_store_paths, |
| const std::vector<doris::CachePath>& cache_paths) { |
| be_config_data_dir_list_state.store(1, std::memory_order_release); |
| Defer defer {[]() { |
| be_config_data_dir_list_state.store(2, std::memory_order_release); |
| be_config_data_dir_list_state.notify_all(); |
| }}; |
| for (int i = 0; i < store_paths.size(); i++) { |
| DataDirInfo data_dir_info; |
| data_dir_info.path = store_paths[i].path; |
| data_dir_info.storage_medium = store_paths[i].storage_medium; |
| data_dir_info.data_dir_type = DataDirType::OLAP_DATA_DIR; |
| data_dir_info.metric_name = "local_data_dir_" + std::to_string(i); |
| be_config_data_dir_list.push_back(data_dir_info); |
| } |
| |
| for (int i = 0; i < spill_store_paths.size(); i++) { |
| doris::DataDirInfo data_dir_info; |
| data_dir_info.path = spill_store_paths[i].path; |
| data_dir_info.storage_medium = spill_store_paths[i].storage_medium; |
| data_dir_info.data_dir_type = doris::DataDirType::SPILL_DISK_DIR; |
| data_dir_info.metric_name = "spill_data_dir_" + std::to_string(i); |
| be_config_data_dir_list.push_back(data_dir_info); |
| } |
| |
| for (int i = 0; i < cache_paths.size(); i++) { |
| doris::DataDirInfo data_dir_info; |
| data_dir_info.path = cache_paths[i].path; |
| data_dir_info.storage_medium = TStorageMedium::REMOTE_CACHE; |
| data_dir_info.data_dir_type = doris::DataDirType::DATA_CACHE_DIR; |
| data_dir_info.metric_name = "local_cache_dir_" + std::to_string(i); |
| be_config_data_dir_list.push_back(data_dir_info); |
| } |
| |
| std::sort(be_config_data_dir_list.begin(), be_config_data_dir_list.end(), |
| [](const DataDirInfo& a, const DataDirInfo& b) { |
| return a.path.length() > b.path.length(); |
| }); |
| } |
| |
| LocalFileReader::LocalFileReader(Path path, size_t file_size, int fd) |
| : _fd(fd), _path(std::move(path)), _file_size(file_size) { |
| _data_dir_path = ""; |
| BeConfDataDirReader::get_data_dir_by_file_path(&_path, &_data_dir_path); |
| DorisMetrics::instance()->local_file_open_reading->increment(1); |
| DorisMetrics::instance()->local_file_reader_total->increment(1); |
| } |
| |
| LocalFileReader::~LocalFileReader() { |
| WARN_IF_ERROR(close(), fmt::format("Failed to close file {}", _path.native())); |
| } |
| |
| Status LocalFileReader::close() { |
| bool expected = false; |
| if (_closed.compare_exchange_strong(expected, true, std::memory_order_acq_rel)) { |
| DorisMetrics::instance()->local_file_open_reading->increment(-1); |
| int res = -1; |
| if (bthread_self() == 0) { |
| res = ::close(_fd); |
| } else { |
| auto task = [&] { res = ::close(_fd); }; |
| AsyncIO::run_task(task, io::FileSystemType::LOCAL); |
| } |
| if (-1 == res) { |
| std::string err = errno_to_str(); |
| return localfs_error(errno, fmt::format("failed to close {}", _path.native())); |
| } |
| _fd = -1; |
| } |
| return Status::OK(); |
| } |
| |
| Status LocalFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
| const IOContext* /*io_ctx*/) { |
| TEST_SYNC_POINT_RETURN_WITH_VALUE("LocalFileReader::read_at_impl", |
| Status::IOError("inject io error")); |
| if (closed()) [[unlikely]] { |
| return Status::InternalError("read closed file: ", _path.native()); |
| } |
| |
| if (offset > _file_size) { |
| return Status::InternalError( |
| "offset exceeds file size(offset: {}, file size: {}, path: {})", offset, _file_size, |
| _path.native()); |
| } |
| size_t bytes_req = result.size; |
| char* to = result.data; |
| bytes_req = std::min(bytes_req, _file_size - offset); |
| *bytes_read = 0; |
| |
| LIMIT_LOCAL_SCAN_IO(get_data_dir_path(), bytes_read); |
| |
| while (bytes_req != 0) { |
| auto res = SYNC_POINT_HOOK_RETURN_VALUE(::pread(_fd, to, bytes_req, offset), |
| "LocalFileReader::pread", _fd, to); |
| DBUG_EXECUTE_IF("LocalFileReader::read_at_impl.io_error", { |
| auto sub_path = dp->param<std::string>("sub_path", ""); |
| if ((sub_path.empty() && _path.filename().compare(kTestFilePath)) || |
| (!sub_path.empty() && _path.native().find(sub_path) != std::string::npos)) { |
| res = -1; |
| errno = EIO; |
| LOG(WARNING) << Status::IOError("debug read io error: {}", _path.native()); |
| } |
| }); |
| if (UNLIKELY(-1 == res && errno != EINTR)) { |
| return localfs_error(errno, fmt::format("failed to read {}", _path.native())); |
| } |
| if (UNLIKELY(res == 0)) { |
| return Status::InternalError("cannot read from {}: unexpected EOF", _path.native()); |
| } |
| if (res > 0) { |
| to += res; |
| offset += res; |
| bytes_req -= res; |
| *bytes_read += res; |
| } |
| } |
| DorisMetrics::instance()->local_bytes_read_total->increment(*bytes_read); |
| return Status::OK(); |
| } |
| |
| } // namespace io |
| } // namespace doris |