| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/io/disk-io-mgr.h" |
| |
| #include "common/global-flags.h" |
| #include "common/thread-debug-info.h" |
| #include "runtime/exec-env.h" |
| #include "runtime/io/data-cache.h" |
| #include "runtime/io/disk-io-mgr-internal.h" |
| #include "runtime/io/handle-cache.inline.h" |
| #include "runtime/io/error-converter.h" |
| |
| #include <boost/algorithm/string.hpp> |
| |
| #include "gutil/strings/substitute.h" |
| #include "util/bit-util.h" |
| #include "util/collection-metrics.h" |
| #include "util/disk-info.h" |
| #include "util/filesystem-util.h" |
| #include "util/hdfs-util.h" |
| #include "util/histogram-metric.h" |
| #include "util/metrics.h" |
| #include "util/test-info.h" |
| #include "util/time.h" |
| |
| #ifndef NDEBUG |
| DECLARE_int32(stress_scratch_write_delay_ms); |
| #endif |
| |
| #include "common/names.h" |
| |
| using namespace impala; |
| using namespace impala::io; |
| using namespace strings; |
| |
| using std::to_string; |
| |
| // Control the number of disks on the machine. If 0, this comes from the system |
| // settings. |
| DEFINE_int32(num_disks, 0, "Number of disks on data node."); |
| // Default IoMgr configs: |
| // The maximum number of the threads per disk is also the max queue depth per disk. |
| DEFINE_int32(num_threads_per_disk, 0, "Number of I/O threads per disk"); |
| // Data cache configuration |
| DEFINE_string(data_cache, "", "The configuration string for IO data cache. " |
| "Default to be an empty string so it's disabled. The configuration string is " |
| "expected to be a list of directories, separated by ',', followed by a ':' and " |
| "a capacity quota per directory. For example /data/0,/data/1:1TB means the cache " |
| "may use up to 2TB, with 1TB max in /data/0 and /data/1 respectively. Please note " |
| "that each Impala daemon on a host must have a unique caching directory."); |
| |
| // Rotational disks should have 1 thread per disk to minimize seeks. Non-rotational |
| // don't have this penalty and benefit from multiple concurrent IO requests. |
| static const int THREADS_PER_ROTATIONAL_DISK = 1; |
| static const int THREADS_PER_SOLID_STATE_DISK = 8; |
| |
| const int64_t DiskIoMgr::IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE; |
| |
| // The maximum number of the threads per rotational disk is also the max queue depth per |
| // rotational disk. |
| static const string num_io_threads_per_rotational_disk_help_msg = Substitute("Number of " |
| "I/O threads per rotational disk. Has priority over num_threads_per_disk. If neither" |
| " is set, defaults to $0 thread(s) per rotational disk", THREADS_PER_ROTATIONAL_DISK); |
| DEFINE_int32(num_io_threads_per_rotational_disk, 0, |
| num_io_threads_per_rotational_disk_help_msg.c_str()); |
| // The maximum number of the threads per solid state disk is also the max queue depth per |
| // solid state disk. |
| static const string num_io_threads_per_solid_state_disk_help_msg = Substitute("Number of" |
| " I/O threads per solid state disk. Has priority over num_threads_per_disk. If " |
| "neither is set, defaults to $0 thread(s) per solid state disk", |
| THREADS_PER_SOLID_STATE_DISK); |
| DEFINE_int32(num_io_threads_per_solid_state_disk, 0, |
| num_io_threads_per_solid_state_disk_help_msg.c_str()); |
| // The maximum number of remote HDFS I/O threads. HDFS access that are expected to be |
| // remote are placed on a separate remote disk queue. This is the queue depth for that |
| // queue. If 0, then the remote queue is not used and instead ranges are round-robined |
| // across the local disk queues. |
| DEFINE_int32(num_remote_hdfs_io_threads, 8, "Number of remote HDFS I/O threads"); |
| // The maximum number of S3 I/O threads. The default value of 16 was chosen emperically |
| // to maximize S3 throughput. Maximum throughput is achieved with multiple connections |
| // open to S3 and use of multiple CPU cores since S3 reads are relatively compute |
| // expensive (SSL and JNI buffer overheads). |
| DEFINE_int32(num_s3_io_threads, 16, "Number of S3 I/O threads"); |
| // The maximum number of ABFS I/O threads. TODO: choose the default empirically. |
| DEFINE_int32(num_abfs_io_threads, 16, "Number of ABFS I/O threads"); |
| // The maximum number of ADLS I/O threads. This number is a good default to have for |
| // clusters that may vary widely in size, due to an undocumented concurrency limit |
| // enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters |
| // (~10 nodes), 64 threads would be more ideal. |
| DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads"); |
| |
| DECLARE_int64(min_buffer_size); |
| |
| // The number of cached file handles defines how much memory can be used per backend for |
| // caching frequently used file handles. Measurements indicate that a single file handle |
| // uses about 6kB of memory. 20k file handles will thus reserve ~120MB of memory. |
| // The actual amount of memory that is associated with a file handle can be larger |
| // or smaller, depending on the replication factor for this file or the path name. |
| DEFINE_uint64(max_cached_file_handles, 20000, "Maximum number of HDFS file handles " |
| "that will be cached. Disabled if set to 0."); |
| |
| // The unused file handle timeout specifies how long a file handle will remain in the |
| // cache if it is not being used. Aging out unused handles ensures that the cache is not |
| // wasting memory on handles that aren't useful. This allows users to specify a larger |
| // cache size, as the system will only use the memory on useful file handles. |
| // Additionally, cached file handles keep an open file descriptor for local files. |
| // If a file is deleted through HDFS, this open file descriptor can keep the disk space |
| // from being freed. When the metadata sees that a file has been deleted, the file handle |
| // will no longer be used by future queries. Aging out this file handle allows the |
| // disk space to be freed in an appropriate period of time. The default value is |
| // 6 hours. This was chosen to be less than a typical value for HDFS's fs.trash.interval. |
| // This means that when files are deleted via the trash, the file handle cache will |
| // have evicted the file handle before the files are flushed from the trash. This |
| // means that the file handle cache won't impact available disk space. |
| DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds, that an " |
| "unused HDFS file handle will remain in the file handle cache. Disabled if set " |
| "to 0."); |
| |
| // The file handle cache is split into multiple independent partitions, each with its |
| // own lock and structures. A larger number of partitions reduces contention by |
| // concurrent accesses, but it also reduces the efficiency of the cache due to |
| // separate LRU lists. |
| // TODO: Test different number of partitions to determine an appropriate default |
| DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used by the " |
| "file handle cache."); |
| |
| // This parameter controls whether remote HDFS file handles are cached. It does not impact |
| // S3, ADLS, or ABFS file handles. |
| DEFINE_bool(cache_remote_file_handles, true, "Enable the file handle cache for " |
| "remote HDFS files."); |
| |
| // This parameter controls whether S3 file handles are cached. |
| DEFINE_bool(cache_s3_file_handles, true, "Enable the file handle cache for " |
| "S3 files."); |
| |
| static const char* DEVICE_NAME_METRIC_KEY_TEMPLATE = |
| "impala-server.io-mgr.queue-$0.device-name"; |
| static const char* READ_LATENCY_METRIC_KEY_TEMPLATE = |
| "impala-server.io-mgr.queue-$0.read-latency"; |
| static const char* READ_SIZE_METRIC_KEY_TEMPLATE = |
| "impala-server.io-mgr.queue-$0.read-size"; |
| |
| AtomicInt32 DiskIoMgr::next_disk_id_; |
| |
| string DiskIoMgr::DebugString() { |
| stringstream ss; |
| ss << "Disks: " << endl; |
| for (DiskQueue* disk_queue : disk_queues_) { |
| disk_queue->DebugString(&ss); |
| ss << endl; |
| } |
| return ss.str(); |
| } |
| |
| WriteRange::WriteRange( |
| const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback) |
| : RequestRange(RequestType::WRITE), callback_(callback) { |
| SetRange(file, file_offset, disk_id); |
| } |
| |
| void WriteRange::SetRange( |
| const std::string& file, int64_t file_offset, int disk_id) { |
| file_ = file; |
| offset_ = file_offset; |
| disk_id_ = disk_id; |
| } |
| |
| void WriteRange::SetData(const uint8_t* buffer, int64_t len) { |
| data_ = buffer; |
| len_ = len; |
| } |
| |
| static void CheckSseSupport() { |
| if (!CpuInfo::IsSupported(CpuInfo::SSE4_2)) { |
| LOG(WARNING) << "This machine does not support sse4_2. The default IO system " |
| "configurations are suboptimal for this hardware. Consider " |
| "increasing the number of threads per disk by restarting impalad " |
| "using the --num_threads_per_disk flag with a higher value"; |
| } |
| } |
| |
| // Utility function to select flag that is set (has a positive value) based on precedence |
| static inline int GetFirstPositiveVal(const int first_val, const int second_val, |
| const int default_val) { |
| return first_val > 0 ? first_val : (second_val > 0 ? second_val : default_val); |
| } |
| |
| DiskIoMgr::DiskIoMgr() : |
| num_io_threads_per_rotational_disk_(GetFirstPositiveVal( |
| FLAGS_num_io_threads_per_rotational_disk, FLAGS_num_threads_per_disk, |
| THREADS_PER_ROTATIONAL_DISK)), |
| num_io_threads_per_solid_state_disk_(GetFirstPositiveVal( |
| FLAGS_num_io_threads_per_solid_state_disk, FLAGS_num_threads_per_disk, |
| THREADS_PER_SOLID_STATE_DISK)), |
| max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(FLAGS_read_size)), |
| min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(FLAGS_min_buffer_size)), |
| file_handle_cache_(min(FLAGS_max_cached_file_handles, |
| FileSystemUtil::MaxNumFileHandles()), |
| FLAGS_num_file_handle_cache_partitions, |
| FLAGS_unused_file_handle_timeout_sec, &hdfs_monitor_) { |
| DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size); |
| int num_local_disks = DiskInfo::num_disks(); |
| if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) { |
| LOG(WARNING) << "Number of disks specified should be between 0 and the number of " |
| "logical disks on the system. Defaulting to system setting of " << |
| DiskInfo::num_disks() << " disks"; |
| } else if (FLAGS_num_disks > 0) { |
| num_local_disks = FLAGS_num_disks; |
| } |
| disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS); |
| CheckSseSupport(); |
| local_file_system_.reset(new LocalFileSystem()); |
| } |
| |
| DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk, |
| int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size) : |
| num_io_threads_per_rotational_disk_(threads_per_rotational_disk), |
| num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk), |
| max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(max_buffer_size)), |
| min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(min_buffer_size)), |
| file_handle_cache_(min(FLAGS_max_cached_file_handles, |
| FileSystemUtil::MaxNumFileHandles()), |
| FLAGS_num_file_handle_cache_partitions, |
| FLAGS_unused_file_handle_timeout_sec, &hdfs_monitor_) { |
| if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks(); |
| disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS); |
| CheckSseSupport(); |
| local_file_system_.reset(new LocalFileSystem()); |
| } |
| |
| DiskIoMgr::~DiskIoMgr() { |
| // Signal all threads to shut down, then wait for them to do so. |
| for (DiskQueue* disk_queue : disk_queues_) { |
| if (disk_queue != nullptr) disk_queue->ShutDown(); |
| } |
| disk_thread_group_.JoinAll(); |
| for (DiskQueue* disk_queue : disk_queues_) delete disk_queue; |
| if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_); |
| if (remote_data_cache_) remote_data_cache_->ReleaseResources(); |
| } |
| |
| Status DiskIoMgr::Init() { |
| for (int i = 0; i < disk_queues_.size(); ++i) { |
| disk_queues_[i] = new DiskQueue(i); |
| int num_threads_per_disk; |
| string device_name; |
| if (i == RemoteDfsDiskId()) { |
| num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads; |
| device_name = "HDFS remote"; |
| } else if (i == RemoteS3DiskId()) { |
| num_threads_per_disk = FLAGS_num_s3_io_threads; |
| device_name = "S3 remote"; |
| } else if (i == RemoteAbfsDiskId()) { |
| num_threads_per_disk = FLAGS_num_abfs_io_threads; |
| device_name = "ABFS remote"; |
| } else if (i == RemoteAdlsDiskId()) { |
| num_threads_per_disk = FLAGS_num_adls_io_threads; |
| device_name = "ADLS remote"; |
| } else if (DiskInfo::is_rotational(i)) { |
| num_threads_per_disk = num_io_threads_per_rotational_disk_; |
| // During tests, i may not point to an existing disk. |
| device_name = i < DiskInfo::num_disks() ? DiskInfo::device_name(i) : to_string(i); |
| } else { |
| num_threads_per_disk = num_io_threads_per_solid_state_disk_; |
| // During tests, i may not point to an existing disk. |
| device_name = i < DiskInfo::num_disks() ? DiskInfo::device_name(i) : to_string(i); |
| } |
| const string& i_string = Substitute("$0", i); |
| |
| // Unit tests may create multiple DiskIoMgrs, so we need to avoid re-registering the |
| // same metrics. |
| if (!TestInfo::is_test() |
| || ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<StringProperty>( |
| Substitute(DEVICE_NAME_METRIC_KEY_TEMPLATE, i_string)) |
| == nullptr) { |
| ImpaladMetrics::IO_MGR_METRICS->AddProperty<string>( |
| DEVICE_NAME_METRIC_KEY_TEMPLATE, device_name, i_string); |
| } |
| int64_t ONE_HOUR_IN_NS = 60L * 60L * NANOS_PER_SEC; |
| HistogramMetric* read_latency = nullptr; |
| HistogramMetric* read_size = nullptr; |
| if (TestInfo::is_test()) { |
| read_latency = |
| ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>( |
| Substitute(READ_LATENCY_METRIC_KEY_TEMPLATE, i_string)); |
| read_size = |
| ImpaladMetrics::IO_MGR_METRICS->FindMetricForTesting<HistogramMetric>( |
| Substitute(READ_SIZE_METRIC_KEY_TEMPLATE, i_string)); |
| } |
| disk_queues_[i]->set_read_latency(read_latency != nullptr ? read_latency : |
| ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric( |
| MetricDefs::Get(READ_LATENCY_METRIC_KEY_TEMPLATE, i_string), |
| ONE_HOUR_IN_NS, 3))); |
| int64_t ONE_GB = 1024L * 1024L * 1024L; |
| disk_queues_[i]->set_read_size(read_size != nullptr ? read_size : |
| ImpaladMetrics::IO_MGR_METRICS->RegisterMetric(new HistogramMetric( |
| MetricDefs::Get(READ_SIZE_METRIC_KEY_TEMPLATE, i_string), ONE_GB, 3))); |
| for (int j = 0; j < num_threads_per_disk; ++j) { |
| stringstream ss; |
| ss << "work-loop(Disk: " << device_name << ", Thread: " << j << ")"; |
| std::unique_ptr<Thread> t; |
| RETURN_IF_ERROR(Thread::Create("disk-io-mgr", ss.str(), &DiskQueue::DiskThreadLoop, |
| disk_queues_[i], this, &t)); |
| disk_thread_group_.AddThread(move(t)); |
| } |
| } |
| // The file handle cache depends on the HDFS monitor, so initialize it first. |
| // Use the same number of threads for the HDFS monitor as there are Disk IO threads. |
| RETURN_IF_ERROR(hdfs_monitor_.Init(disk_thread_group_.Size())); |
| RETURN_IF_ERROR(file_handle_cache_.Init()); |
| |
| cached_read_options_ = hadoopRzOptionsAlloc(); |
| DCHECK(cached_read_options_ != nullptr); |
| // Disable checksumming for cached reads. |
| int ret = hadoopRzOptionsSetSkipChecksum(cached_read_options_, true); |
| DCHECK_EQ(ret, 0); |
| // Disable automatic fallback for cached reads. |
| ret = hadoopRzOptionsSetByteBufferPool(cached_read_options_, nullptr); |
| DCHECK_EQ(ret, 0); |
| |
| if (!FLAGS_data_cache.empty()) { |
| remote_data_cache_.reset(new DataCache(FLAGS_data_cache)); |
| RETURN_IF_ERROR(remote_data_cache_->Init()); |
| } |
| return Status::OK(); |
| } |
| |
| unique_ptr<RequestContext> DiskIoMgr::RegisterContext() { |
| return unique_ptr<RequestContext>(new RequestContext(this, disk_queues_)); |
| } |
| |
| void DiskIoMgr::UnregisterContext(RequestContext* reader) { |
| reader->CancelAndMarkInactive(); |
| } |
| |
| Status DiskIoMgr::ValidateScanRange(ScanRange* range) { |
| int disk_id = range->disk_id(); |
| if (disk_id < 0 || disk_id >= disk_queues_.size()) { |
| return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(), |
| Substitute("Invalid scan range. Bad disk id: $0", disk_id)); |
| } |
| if (range->offset() < 0) { |
| return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(), |
| Substitute("Invalid scan range. Negative offset $0", range->offset())); |
| } |
| if (range->len() <= 0) { |
| return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(), |
| Substitute("Invalid scan range. Non-positive length $0", range->len())); |
| } |
| if (range->bytes_to_read() <= 0) { |
| return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(), |
| Substitute("Invalid scan range. Non-positive bytes to read $0", |
| range->bytes_to_read())); |
| } |
| return Status::OK(); |
| } |
| |
| Status DiskIoMgr::AllocateBuffersForRange( |
| BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes) { |
| DCHECK_GE(max_bytes, min_buffer_size_); |
| DCHECK(range->external_buffer_tag() == ScanRange::ExternalBufferTag::NO_BUFFER) |
| << static_cast<int>(range->external_buffer_tag()) << " invalid to allocate buffers " |
| << "when already reading into an external buffer"; |
| BufferPool* bp = ExecEnv::GetInstance()->buffer_pool(); |
| Status status; |
| vector<unique_ptr<BufferDescriptor>> buffers; |
| for (int64_t buffer_size : ChooseBufferSizes(range->bytes_to_read(), max_bytes)) { |
| BufferPool::BufferHandle handle; |
| status = bp->AllocateBuffer(bp_client, buffer_size, &handle); |
| if (!status.ok()) goto error; |
| buffers.emplace_back(new BufferDescriptor(range, bp_client, move(handle))); |
| } |
| range->AddUnusedBuffers(move(buffers), false); |
| return Status::OK(); |
| error: |
| DCHECK(!status.ok()); |
| range->CleanUpBuffers(move(buffers)); |
| return status; |
| } |
| |
| vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes) { |
| DCHECK_GE(max_bytes, min_buffer_size_); |
| vector<int64_t> buffer_sizes; |
| int64_t bytes_allocated = 0; |
| while (bytes_allocated < scan_range_len) { |
| int64_t bytes_remaining = scan_range_len - bytes_allocated; |
| // Either allocate a max-sized buffer or a smaller buffer to fit the rest of the |
| // range. |
| int64_t next_buffer_size; |
| if (bytes_remaining >= max_buffer_size_) { |
| next_buffer_size = max_buffer_size_; |
| } else { |
| next_buffer_size = |
| max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(bytes_remaining)); |
| } |
| if (next_buffer_size + bytes_allocated > max_bytes) { |
| // Can't allocate the desired buffer size. Make sure to allocate at least one |
| // buffer. |
| if (bytes_allocated > 0) break; |
| next_buffer_size = BitUtil::RoundDownToPowerOfTwo(max_bytes); |
| } |
| DCHECK(BitUtil::IsPowerOf2(next_buffer_size)) << next_buffer_size; |
| buffer_sizes.push_back(next_buffer_size); |
| bytes_allocated += next_buffer_size; |
| } |
| return buffer_sizes; |
| } |
| |
| int64_t DiskIoMgr::ComputeIdealBufferReservation(int64_t scan_range_len) { |
| if (scan_range_len < max_buffer_size_) { |
| // Round up to nearest power-of-two buffer size - ideally we should do a single read |
| // I/O for this range. |
| return max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(scan_range_len)); |
| } else { |
| // Round up to the nearest max-sized I/O buffer, capped by |
| // IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE - we should do one or more max-sized read |
| // I/Os for this range. |
| return min(IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size_, |
| BitUtil::RoundUpToPowerOf2(scan_range_len, max_buffer_size_)); |
| } |
| } |
| |
| // This function gets the next RequestRange to work on for this disk. It blocks until |
| // work is available or the thread is shut down. |
| // Work is available if there is a RequestContext with |
| // - A ScanRange with a buffer available, or |
| // - A WriteRange in unstarted_write_ranges_. |
| RequestRange* DiskQueue::GetNextRequestRange(RequestContext** request_context) { |
| // This loops returns either with work to do or when the disk IoMgr shuts down. |
| while (true) { |
| *request_context = nullptr; |
| { |
| unique_lock<mutex> disk_lock(lock_); |
| while (!shut_down_ && request_contexts_.empty()) { |
| // wait if there are no readers on the queue |
| work_available_.Wait(disk_lock); |
| } |
| if (shut_down_) break; |
| DCHECK(!request_contexts_.empty()); |
| |
| // Get the next reader and remove the reader so that another disk thread |
| // can't pick it up. It will be enqueued before issuing the read to HDFS |
| // so this is not a big deal (i.e. multiple disk threads can read for the |
| // same reader). |
| *request_context = request_contexts_.front(); |
| request_contexts_.pop_front(); |
| DCHECK(*request_context != nullptr); |
| // Must increment refcount to keep RequestContext after dropping 'disk_lock' |
| (*request_context)->IncrementDiskThreadAfterDequeue(disk_id_); |
| } |
| // Get the next range to process for this reader. If this context does not have a |
| // range, rinse and repeat. |
| RequestRange* range = (*request_context)->GetNextRequestRange(disk_id_); |
| if (range != nullptr) return range; |
| } |
| DCHECK(shut_down_); |
| return nullptr; |
| } |
| |
| void DiskQueue::DiskThreadLoop(DiskIoMgr* io_mgr) { |
| // The thread waits until there is work or the queue is shut down. If there is work, |
| // performs the read or write requested. Locks are not taken when reading from or |
| // writing to disk. |
| while (true) { |
| RequestContext* worker_context = nullptr; |
| RequestRange* range = GetNextRequestRange(&worker_context); |
| if (range == nullptr) { |
| DCHECK(shut_down_); |
| return; |
| } |
| // We are now working on behalf of a query, so set thread state appropriately. |
| // See also IMPALA-6254 and IMPALA-6417. |
| ScopedThreadContext tdi_scope(GetThreadDebugInfo(), worker_context->query_id(), |
| worker_context->instance_id()); |
| |
| if (range->request_type() == RequestType::READ) { |
| ScanRange* scan_range = static_cast<ScanRange*>(range); |
| ReadOutcome outcome = scan_range->DoRead(this, disk_id_); |
| worker_context->ReadDone(disk_id_, outcome, scan_range); |
| } else { |
| DCHECK(range->request_type() == RequestType::WRITE); |
| io_mgr->Write(worker_context, static_cast<WriteRange*>(range)); |
| } |
| } |
| } |
| |
| void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) { |
| Status ret_status = Status::OK(); |
| FILE* file_handle = nullptr; |
| Status close_status = Status::OK(); |
| ret_status = local_file_system_->OpenForWrite(write_range->file(), O_RDWR | O_CREAT, |
| S_IRUSR | S_IWUSR, &file_handle); |
| if (!ret_status.ok()) goto end; |
| |
| ret_status = WriteRangeHelper(file_handle, write_range); |
| |
| close_status = local_file_system_->Fclose(file_handle, write_range); |
| if (ret_status.ok() && !close_status.ok()) ret_status = close_status; |
| |
| end: |
| writer_context->WriteDone(write_range, ret_status); |
| } |
| |
| Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) { |
| // Seek to the correct offset and perform the write. |
| RETURN_IF_ERROR(local_file_system_->Fseek(file_handle, write_range->offset(), SEEK_SET, |
| write_range)); |
| |
| #ifndef NDEBUG |
| if (FLAGS_stress_scratch_write_delay_ms > 0) { |
| SleepForMs(FLAGS_stress_scratch_write_delay_ms); |
| } |
| #endif |
| RETURN_IF_ERROR(local_file_system_->Fwrite(file_handle, write_range)); |
| |
| ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len()); |
| return Status::OK(); |
| } |
| |
| int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) { |
| // If it's a remote range, check for an appropriate remote disk queue. |
| if (!expected_local) { |
| if (IsHdfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) { |
| return RemoteDfsDiskId(); |
| } |
| if (IsS3APath(file)) return RemoteS3DiskId(); |
| if (IsABFSPath(file)) return RemoteAbfsDiskId(); |
| if (IsADLSPath(file)) return RemoteAdlsDiskId(); |
| } |
| // Assign to a local disk queue. |
| DCHECK(!IsS3APath(file)); // S3 is always remote. |
| DCHECK(!IsABFSPath(file)); // ABFS is always remote. |
| DCHECK(!IsADLSPath(file)); // ADLS is always remote. |
| if (disk_id == -1) { |
| // disk id is unknown, assign it an arbitrary one. |
| disk_id = next_disk_id_.Add(1); |
| } |
| // TODO: we need to parse the config for the number of dirs configured for this |
| // data node. |
| return disk_id % num_local_disks(); |
| } |
| |
| Status DiskIoMgr::GetExclusiveHdfsFileHandle(const hdfsFS& fs, |
| std::string* fname, int64_t mtime, RequestContext *reader, |
| unique_ptr<ExclusiveHdfsFileHandle>& fid_out) { |
| SCOPED_TIMER(reader->open_file_timer_); |
| unique_ptr<ExclusiveHdfsFileHandle> fid; |
| fid.reset(new ExclusiveHdfsFileHandle(fs, fname, mtime)); |
| RETURN_IF_ERROR(fid->Init(&hdfs_monitor_)); |
| fid_out.swap(fid); |
| ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L); |
| // Every exclusive file handle is considered a cache miss |
| ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(0L); |
| ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT->Increment(1L); |
| reader->cached_file_handles_miss_count_.Add(1L); |
| return Status::OK(); |
| } |
| |
| void DiskIoMgr::ReleaseExclusiveHdfsFileHandle(unique_ptr<ExclusiveHdfsFileHandle> fid) { |
| DCHECK(fid != nullptr); |
| ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L); |
| fid.reset(); |
| } |
| |
| Status DiskIoMgr::GetCachedHdfsFileHandle(const hdfsFS& fs, |
| std::string* fname, int64_t mtime, RequestContext *reader, |
| CachedHdfsFileHandle** handle_out) { |
| bool cache_hit; |
| SCOPED_TIMER(reader->open_file_timer_); |
| RETURN_IF_ERROR(file_handle_cache_.GetFileHandle(fs, fname, mtime, false, handle_out, |
| &cache_hit)); |
| ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(1L); |
| if (cache_hit) { |
| ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(1L); |
| ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_COUNT->Increment(1L); |
| reader->cached_file_handles_hit_count_.Add(1L); |
| } else { |
| ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_HIT_RATIO->Update(0L); |
| ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_MISS_COUNT->Increment(1L); |
| reader->cached_file_handles_miss_count_.Add(1L); |
| } |
| return Status::OK(); |
| } |
| |
| void DiskIoMgr::ReleaseCachedHdfsFileHandle(std::string* fname, |
| CachedHdfsFileHandle* fid) { |
| file_handle_cache_.ReleaseFileHandle(fname, fid, false); |
| ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L); |
| } |
| |
| Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, |
| int64_t mtime, RequestContext* reader, CachedHdfsFileHandle** fid) { |
| bool cache_hit; |
| SCOPED_TIMER(reader->open_file_timer_); |
| ImpaladMetrics::IO_MGR_CACHED_FILE_HANDLES_REOPENED->Increment(1L); |
| file_handle_cache_.ReleaseFileHandle(fname, *fid, true); |
| // The old handle has been destroyed, so *fid must be overwritten before returning. |
| *fid = nullptr; |
| Status status = file_handle_cache_.GetFileHandle(fs, fname, mtime, true, fid, |
| &cache_hit); |
| if (!status.ok()) { |
| ImpaladMetrics::IO_MGR_NUM_FILE_HANDLES_OUTSTANDING->Increment(-1L); |
| return status; |
| } |
| DCHECK(!cache_hit); |
| return Status::OK(); |
| } |
| |
| void DiskQueue::ShutDown() { |
| { |
| unique_lock<mutex> disk_lock(lock_); |
| shut_down_ = true; |
| } |
| // All waiting threads should exit, so wake them all up. |
| work_available_.NotifyAll(); |
| } |
| |
| void DiskQueue::DebugString(stringstream* ss) { |
| unique_lock<mutex> lock(lock_); |
| *ss << "DiskQueue id=" << disk_id_ << " ptr=" << static_cast<void*>(this) << ":" ; |
| if (!request_contexts_.empty()) { |
| *ss << " Readers: "; |
| for (RequestContext* req_context: request_contexts_) { |
| *ss << static_cast<void*>(req_context); |
| } |
| } |
| } |
| |
| DiskQueue::~DiskQueue() { |
| for (RequestContext* context : request_contexts_) { |
| context->UnregisterDiskQueue(disk_id_); |
| } |
| } |