blob: 536ed88f2f015acd44608a40d6aa294b8e9bcb9b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/disk-io-mgr.h"
#include "runtime/disk-io-mgr-internal.h"
#include <boost/algorithm/string.hpp>
#include "gutil/strings/substitute.h"
#include "util/bit-util.h"
#include "util/hdfs-util.h"
#include "util/time.h"
#ifndef NDEBUG
#include "common/names.h"
using namespace impala;
using namespace strings;
// Control the number of disks on the machine. If 0, this comes from the system
// settings.
DEFINE_int32(num_disks, 0, "Number of disks on data node.");
// Default IoMgr configs:
// The maximum number of the threads per disk is also the max queue depth per disk.
DEFINE_int32(num_threads_per_disk, 0, "number of threads per disk");
// The maximum number of remote HDFS I/O threads. HDFS access that are expected to be
// remote are placed on a separate remote disk queue. This is the queue depth for that
// queue. If 0, then the remote queue is not used and instead ranges are round-robined
// across the local disk queues.
DEFINE_int32(num_remote_hdfs_io_threads, 8, "number of remote HDFS I/O threads");
// The maximum number of S3 I/O threads. The default value of 16 was chosen emperically
// to maximize S3 throughput. Maximum throughput is achieved with multiple connections
// open to S3 and use of multiple CPU cores since S3 reads are relatively compute
// expensive (SSL and JNI buffer overheads).
DEFINE_int32(num_s3_io_threads, 16, "number of S3 I/O threads");
// The maximum number of ADLS I/O threads. This number is a good default to have for
// clusters that may vary widely in size, due to an undocumented concurrency limit
// enforced by ADLS for a cluster, which spans between 500-700. For smaller clusters
// (~10 nodes), 64 threads would be more ideal.
DEFINE_int32(num_adls_io_threads, 16, "number of ADLS I/O threads");
// The read size is the size of the reads sent to hdfs/os.
// There is a trade off of latency and throughout, trying to keep disks busy but
// not introduce seeks. The literature seems to agree that with 8 MB reads, random
// io and sequential io perform similarly.
DEFINE_int32(read_size, 8 * 1024 * 1024, "Read Size (in bytes)");
DEFINE_int32(min_buffer_size, 1024, "The minimum read buffer size (in bytes)");
// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
DEFINE_int32(max_free_io_buffers, 128,
"For each io buffer size, the maximum number of buffers the IoMgr will hold onto");
// The number of cached file handles defines how much memory can be used per backend for
// caching frequently used file handles. Currently, we assume that approximately 2kB data
// are associated with a single file handle. 10k file handles will thus reserve ~20MB
// data. The actual amount of memory that is associated with a file handle can be larger
// or smaller, depending on the replication factor for this file or the path name.
DEFINE_uint64(max_cached_file_handles, 0, "Maximum number of HDFS file handles "
"that will be cached. Disabled if set to 0.");
// Rotational disks should have 1 thread per disk to minimize seeks. Non-rotational
// don't have this penalty and benefit from multiple concurrent IO requests.
static const int THREADS_PER_ROTATIONAL_DISK = 1;
static const int THREADS_PER_FLASH_DISK = 8;
// The IoMgr is able to run with a wide range of memory usage. If a query has memory
// remaining less than this value, the IoMgr will stop all buffering regardless of the
// current queue size.
static const int LOW_MEMORY = 64 * 1024 * 1024;
const int DiskIoMgr::DEFAULT_QUEUE_CAPACITY = 2;
AtomicInt32 DiskIoMgr::next_disk_id_;
namespace detail {
// Indicates if file handle caching should be used
static inline bool is_file_handle_caching_enabled() {
return FLAGS_max_cached_file_handles > 0;
/// This method is used to clean up resources upon eviction of a cache file handle.
void DiskIoMgr::HdfsCachedFileHandle::Release(DiskIoMgr::HdfsCachedFileHandle** h) {
VLOG_FILE << "Cached file handle evicted, hdfsCloseFile() fid=" << (*h)->hdfs_file_;
delete (*h);
DiskIoMgr::HdfsCachedFileHandle::HdfsCachedFileHandle(const hdfsFS& fs, const char* fname,
int64_t mtime)
: fs_(fs), hdfs_file_(hdfsOpenFile(fs, fname, O_RDONLY, 0, 0, 0)), mtime_(mtime) {
VLOG_FILE << "hdfsOpenFile() file=" << fname << " fid=" << hdfs_file_;
DiskIoMgr::HdfsCachedFileHandle::~HdfsCachedFileHandle() {
if (hdfs_file_ != NULL && fs_ != NULL) {
VLOG_FILE << "hdfsCloseFile() fid=" << hdfs_file_;
hdfsCloseFile(fs_, hdfs_file_);
fs_ = NULL;
hdfs_file_ = NULL;
// This class provides a cache of DiskIoRequestContext objects. DiskIoRequestContexts
// are recycled. This is good for locality as well as lock contention. The cache has
// the property that regardless of how many clients get added/removed, the memory
// locations for existing clients do not change (not the case with std::vector)
// minimizing the locks we have to take across all readers.
// All functions on this object are thread safe
class DiskIoMgr::RequestContextCache {
RequestContextCache(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {}
// Returns a context to the cache. This object can now be reused.
void ReturnContext(DiskIoRequestContext* reader) {
DCHECK(reader->state_ != DiskIoRequestContext::Inactive);
reader->state_ = DiskIoRequestContext::Inactive;
lock_guard<mutex> l(lock_);
// Returns a new DiskIoRequestContext object. Allocates a new object if necessary.
DiskIoRequestContext* GetNewContext() {
lock_guard<mutex> l(lock_);
if (!inactive_contexts_.empty()) {
DiskIoRequestContext* reader = inactive_contexts_.front();
return reader;
} else {
DiskIoRequestContext* reader =
new DiskIoRequestContext(io_mgr_, io_mgr_->num_total_disks());
return reader;
// This object has the same lifetime as the disk IoMgr.
~RequestContextCache() {
for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
it != all_contexts_.end(); ++it) {
delete *it;
// Validates that all readers are cleaned up and in the inactive state. No locks
// are taken since this is only called from the disk IoMgr destructor.
bool ValidateAllInactive() {
for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
it != all_contexts_.end(); ++it) {
if ((*it)->state_ != DiskIoRequestContext::Inactive) {
return false;
DCHECK_EQ(all_contexts_.size(), inactive_contexts_.size());
return all_contexts_.size() == inactive_contexts_.size();
string DebugString();
DiskIoMgr* io_mgr_;
// lock to protect all members below
mutex lock_;
// List of all request contexts created. Used for debugging
list<DiskIoRequestContext*> all_contexts_;
// List of inactive readers. These objects can be used for a new reader.
list<DiskIoRequestContext*> inactive_contexts_;
string DiskIoMgr::RequestContextCache::DebugString() {
lock_guard<mutex> l(lock_);
stringstream ss;
for (list<DiskIoRequestContext*>::iterator it = all_contexts_.begin();
it != all_contexts_.end(); ++it) {
unique_lock<mutex> lock((*it)->lock_);
ss << (*it)->DebugString() << endl;
return ss.str();
string DiskIoMgr::DebugString() {
stringstream ss;
ss << "RequestContexts: " << endl << request_context_cache_->DebugString() << endl;
ss << "Disks: " << endl;
for (int i = 0; i < disk_queues_.size(); ++i) {
unique_lock<mutex> lock(disk_queues_[i]->lock);
ss << " " << (void*) disk_queues_[i] << ":" ;
if (!disk_queues_[i]->request_contexts.empty()) {
ss << " Readers: ";
for (DiskIoRequestContext* req_context: disk_queues_[i]->request_contexts) {
ss << (void*)req_context;
ss << endl;
return ss.str();
DiskIoMgr::BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr) : io_mgr_(io_mgr) {
void DiskIoMgr::BufferDescriptor::Reset() {
DCHECK(io_mgr_ != NULL);
reader_ = NULL;
scan_range_ = NULL;
mem_tracker_ = NULL;
buffer_ = NULL;
buffer_len_ = 0;
len_ = 0;
eosr_ = false;
status_ = Status::OK();
scan_range_offset_ = 0;
void DiskIoMgr::BufferDescriptor::Reset(DiskIoRequestContext* reader, ScanRange* range,
uint8_t* buffer, int64_t buffer_len, MemTracker* mem_tracker) {
DCHECK(io_mgr_ != NULL);
DCHECK(buffer_ == NULL);
DCHECK(range != NULL);
DCHECK(buffer != NULL);
DCHECK_GE(buffer_len, 0);
DCHECK_NE(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER,
mem_tracker == NULL);
reader_ = reader;
scan_range_ = range;
mem_tracker_ = mem_tracker;
buffer_ = buffer;
buffer_len_ = buffer_len;
len_ = 0;
eosr_ = false;
status_ = Status::OK();
scan_range_offset_ = 0;
void DiskIoMgr::BufferDescriptor::TransferOwnership(MemTracker* dst) {
DCHECK(dst != NULL);
// Memory of cached buffers is not tracked against a tracker.
if (is_cached()) return;
DCHECK(mem_tracker_ != NULL);
mem_tracker_ = dst;
void DiskIoMgr::BufferDescriptor::Return() {
DCHECK(io_mgr_ != NULL);
const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
: RequestRange(RequestType::WRITE), callback_(callback) {
SetRange(file, file_offset, disk_id);
void DiskIoMgr::WriteRange::SetRange(
const std::string& file, int64_t file_offset, int disk_id) {
file_ = file;
offset_ = file_offset;
disk_id_ = disk_id;
void DiskIoMgr::WriteRange::SetData(const uint8_t* buffer, int64_t len) {
data_ = buffer;
len_ = len;
static void CheckSseSupport() {
if (!CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
LOG(WARNING) << "This machine does not support sse4_2. The default IO system "
"configurations are suboptimal for this hardware. Consider "
"increasing the number of threads per disk by restarting impalad "
"using the --num_threads_per_disk flag with a higher value";
DiskIoMgr::DiskIoMgr() :
&HdfsCachedFileHandle::Release) {
int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
int num_local_disks = FLAGS_num_disks == 0 ? DiskInfo::num_disks() : FLAGS_num_disks;
disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_disk, int min_buffer_size,
int max_buffer_size) :
FileSystemUtil::MaxNumFileHandles()), &HdfsCachedFileHandle::Release) {
int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
DiskIoMgr::~DiskIoMgr() {
shut_down_ = true;
// Notify all worker threads and shut them down.
for (int i = 0; i < disk_queues_.size(); ++i) {
if (disk_queues_[i] == NULL) continue;
// This lock is necessary to properly use the condition var to notify
// the disk worker threads. The readers also grab this lock so updates
// to shut_down_ are protected.
unique_lock<mutex> disk_lock(disk_queues_[i]->lock);
for (int i = 0; i < disk_queues_.size(); ++i) {
if (disk_queues_[i] == NULL) continue;
int disk_id = disk_queues_[i]->disk_id;
for (list<DiskIoRequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
it != disk_queues_[i]->request_contexts.end(); ++it) {
DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
DCHECK(request_context_cache_.get() == NULL ||
<< endl << DebugString();
DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
// Delete all allocated buffers
int num_free_buffers = 0;
for (int idx = 0; idx < free_buffers_.size(); ++idx) {
num_free_buffers += free_buffers_[idx].size();
DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers);
for (int i = 0; i < disk_queues_.size(); ++i) {
delete disk_queues_[i];
if (free_buffer_mem_tracker_ != NULL) free_buffer_mem_tracker_->UnregisterFromParent();
if (cached_read_options_ != NULL) hadoopRzOptionsFree(cached_read_options_);
Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
DCHECK(process_mem_tracker != NULL);
new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
for (int i = 0; i < disk_queues_.size(); ++i) {
disk_queues_[i] = new DiskQueue(i);
int num_threads_per_disk;
if (i == RemoteDfsDiskId()) {
num_threads_per_disk = FLAGS_num_remote_hdfs_io_threads;
} else if (i == RemoteS3DiskId()) {
num_threads_per_disk = FLAGS_num_s3_io_threads;
} else if (i == RemoteAdlsDiskId()) {
num_threads_per_disk = FLAGS_num_adls_io_threads;
} else if (num_threads_per_disk_ != 0) {
num_threads_per_disk = num_threads_per_disk_;
} else if (DiskInfo::is_rotational(i)) {
num_threads_per_disk = THREADS_PER_ROTATIONAL_DISK;
} else {
num_threads_per_disk = THREADS_PER_FLASH_DISK;
for (int j = 0; j < num_threads_per_disk; ++j) {
stringstream ss;
ss << "work-loop(Disk: " << i << ", Thread: " << j << ")";
disk_thread_group_.AddThread(new Thread("disk-io-mgr", ss.str(),
&DiskIoMgr::WorkLoop, this, disk_queues_[i]));
request_context_cache_.reset(new RequestContextCache(this));
cached_read_options_ = hadoopRzOptionsAlloc();
DCHECK(cached_read_options_ != NULL);
// Disable checksumming for cached reads.
int ret = hadoopRzOptionsSetSkipChecksum(cached_read_options_, true);
DCHECK_EQ(ret, 0);
// Disable automatic fallback for cached reads.
ret = hadoopRzOptionsSetByteBufferPool(cached_read_options_, NULL);
DCHECK_EQ(ret, 0);
return Status::OK();
void DiskIoMgr::RegisterContext(DiskIoRequestContext** request_context,
MemTracker* mem_tracker) {
DCHECK(request_context_cache_.get() != NULL) << "Must call Init() first.";
*request_context = request_context_cache_->GetNewContext();
void DiskIoMgr::UnregisterContext(DiskIoRequestContext* reader) {
// Blocking cancel (waiting for disks completion).
CancelContext(reader, true);
// All the disks are done with clean, validate nothing is leaking.
unique_lock<mutex> reader_lock(reader->lock_);
DCHECK_EQ(reader->num_buffers_in_reader_.Load(), 0) << endl << reader->DebugString();
DCHECK_EQ(reader->num_used_buffers_.Load(), 0) << endl << reader->DebugString();
DCHECK(reader->Validate()) << endl << reader->DebugString();
// Cancellation requires coordination from multiple threads. Each thread that currently
// has a reference to the request context must notice the cancel and remove it from its
// tracking structures. The last thread to touch the context should deallocate (aka
// recycle) the request context object. Potential threads are:
// 1. Disk threads that are currently reading for this reader.
// 2. Caller threads that are waiting in GetNext.
// The steps are:
// 1. Cancel will immediately set the context in the Cancelled state. This prevents any
// other thread from adding more ready buffers to the context (they all take a lock and
// check the state before doing so), or any write ranges to the context.
// 2. Cancel will call cancel on each ScanRange that is not yet complete, unblocking
// any threads in GetNext(). The reader will see the cancelled Status returned. Cancel
// also invokes the callback for the WriteRanges with the cancelled state.
// 3. Disk threads notice the context is cancelled either when picking the next context
// to process or when they try to enqueue a ready buffer. Upon noticing the cancelled
// state, removes the context from the disk queue. The last thread per disk with an
// outstanding reference to the context decrements the number of disk queues the context
// is on.
// If wait_for_disks_completion is true, wait for the number of active disks to become 0.
void DiskIoMgr::CancelContext(DiskIoRequestContext* context, bool wait_for_disks_completion) {
if (wait_for_disks_completion) {
unique_lock<mutex> lock(context->lock_);
DCHECK(context->Validate()) << endl << context->DebugString();
while (context->num_disks_with_ranges_ > 0) {
void DiskIoMgr::set_read_timer(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {
r->read_timer_ = c;
void DiskIoMgr::set_bytes_read_counter(DiskIoRequestContext* r, RuntimeProfile::Counter* c) {
r->bytes_read_counter_ = c;
void DiskIoMgr::set_active_read_thread_counter(DiskIoRequestContext* r,
RuntimeProfile::Counter* c) {
r->active_read_thread_counter_ = c;
void DiskIoMgr::set_disks_access_bitmap(DiskIoRequestContext* r,
RuntimeProfile::Counter* c) {
r->disks_accessed_bitmap_ = c;
int64_t DiskIoMgr::queue_size(DiskIoRequestContext* reader) const {
return reader->num_ready_buffers_.Load();
Status DiskIoMgr::context_status(DiskIoRequestContext* context) const {
unique_lock<mutex> lock(context->lock_);
return context->status_;
int64_t DiskIoMgr::bytes_read_local(DiskIoRequestContext* reader) const {
return reader->bytes_read_local_.Load();
int64_t DiskIoMgr::bytes_read_short_circuit(DiskIoRequestContext* reader) const {
return reader->bytes_read_short_circuit_.Load();
int64_t DiskIoMgr::bytes_read_dn_cache(DiskIoRequestContext* reader) const {
return reader->bytes_read_dn_cache_.Load();
int DiskIoMgr::num_remote_ranges(DiskIoRequestContext* reader) const {
return reader->num_remote_ranges_.Load();
int64_t DiskIoMgr::unexpected_remote_bytes(DiskIoRequestContext* reader) const {
return reader->unexpected_remote_bytes_.Load();
int64_t DiskIoMgr::GetReadThroughput() {
return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, &read_timer_);
Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
int disk_id = range->disk_id_;
if (disk_id < 0 || disk_id >= disk_queues_.size()) {
return Status(Substitute("Invalid scan range. Bad disk id: $0", disk_id));
if (range->offset_ < 0) {
return Status(Substitute("Invalid scan range. Negative offset $0", range->offset_));
if (range->len_ < 0) {
return Status(Substitute("Invalid scan range. Negative length $0", range->len_));
return Status::OK();
Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader,
const vector<ScanRange*>& ranges, bool schedule_immediately) {
if (ranges.empty()) return Status::OK();
// Validate and initialize all ranges
for (int i = 0; i < ranges.size(); ++i) {
ranges[i]->InitInternal(this, reader);
// disks that this reader needs to be scheduled on.
unique_lock<mutex> reader_lock(reader->lock_);
DCHECK(reader->Validate()) << endl << reader->DebugString();
if (reader->state_ == DiskIoRequestContext::Cancelled) {
return reader->status_;
// Add each range to the queue of the disk the range is on
for (int i = 0; i < ranges.size(); ++i) {
// Don't add empty ranges.
DCHECK_NE(ranges[i]->len(), 0);
ScanRange* range = ranges[i];
if (range->try_cache_) {
if (schedule_immediately) {
bool cached_read_succeeded;
if (cached_read_succeeded) continue;
// Cached read failed, fall back to AddRequestRange() below.
} else {
reader->AddRequestRange(range, schedule_immediately);
DCHECK(reader->Validate()) << endl << reader->DebugString();
return Status::OK();
Status DiskIoMgr::AddScanRange(
DiskIoRequestContext* reader, ScanRange* range, bool schedule_immediately) {
return AddScanRanges(reader, vector<ScanRange*>({range}), schedule_immediately);
// This function returns the next scan range the reader should work on, checking
// for eos and error cases. If there isn't already a cached scan range or a scan
// range prepared by the disk threads, the caller waits on the disk threads.
Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range) {
DCHECK(reader != NULL);
DCHECK(range != NULL);
*range = NULL;
Status status = Status::OK();
unique_lock<mutex> reader_lock(reader->lock_);
DCHECK(reader->Validate()) << endl << reader->DebugString();
while (true) {
if (reader->state_ == DiskIoRequestContext::Cancelled) {
status = reader->status_;
if (reader->num_unstarted_scan_ranges_.Load() == 0 &&
reader->ready_to_start_ranges_.empty() && reader->cached_ranges_.empty()) {
// All ranges are done, just return.
if (!reader->cached_ranges_.empty()) {
// We have a cached range.
*range = reader->cached_ranges_.Dequeue();
bool cached_read_succeeded;
if (cached_read_succeeded) return Status::OK();
// This range ended up not being cached. Loop again and pick up a new range.
reader->AddRequestRange(*range, false);
DCHECK(reader->Validate()) << endl << reader->DebugString();
*range = NULL;
if (reader->ready_to_start_ranges_.empty()) {
} else {
*range = reader->ready_to_start_ranges_.Dequeue();
DCHECK(*range != NULL);
int disk_id = (*range)->disk_id();
DCHECK_EQ(*range, reader->disk_states_[disk_id].next_scan_range_to_start());
// Set this to NULL, the next time this disk runs for this reader, it will
// get another range ready.
return status;
Status DiskIoMgr::Read(DiskIoRequestContext* reader,
ScanRange* range, BufferDescriptor** buffer) {
DCHECK(range != NULL);
DCHECK(buffer != NULL);
*buffer = NULL;
if (range->len() > max_buffer_size_
&& range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
return Status(Substitute("Internal error: cannot perform sync read of '$0' bytes "
"that is larger than the max read buffer size '$1'.",
range->len(), max_buffer_size_));
vector<DiskIoMgr::ScanRange*> ranges;
RETURN_IF_ERROR(AddScanRanges(reader, ranges, true));
DCHECK((*buffer) != NULL);
return Status::OK();
void DiskIoMgr::ReturnBuffer(BufferDescriptor* buffer_desc) {
DCHECK(buffer_desc != NULL);
if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == NULL);
DiskIoRequestContext* reader = buffer_desc->reader_;
if (buffer_desc->buffer_ != NULL) {
if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
// Buffers the were not allocated by DiskIoMgr don't need to be freed.
} else {
// A NULL buffer means there was an error in which case there is no buffer
// to return.
if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) {
// Need to close the scan range if returning the last buffer or the scan range
// has been cancelled (and the caller might never get the last buffer).
// Close() is idempotent so multiple cancelled buffers is okay.
void DiskIoMgr::ReturnBufferDesc(BufferDescriptor* desc) {
DCHECK(desc != NULL);
unique_lock<mutex> lock(free_buffers_lock_);
DCHECK(find(free_buffer_descs_.begin(), free_buffer_descs_.end(), desc)
== free_buffer_descs_.end());
DiskIoMgr::BufferDescriptor* DiskIoMgr::GetBufferDesc(DiskIoRequestContext* reader,
MemTracker* mem_tracker, ScanRange* range, uint8_t* buffer, int64_t buffer_size) {
BufferDescriptor* buffer_desc;
unique_lock<mutex> lock(free_buffers_lock_);
if (free_buffer_descs_.empty()) {
buffer_desc = pool_.Add(new BufferDescriptor(this));
} else {
buffer_desc = free_buffer_descs_.front();
buffer_desc->Reset(reader, range, buffer, buffer_size, mem_tracker);
return buffer_desc;
DiskIoMgr::BufferDescriptor* DiskIoMgr::GetFreeBuffer(DiskIoRequestContext* reader,
ScanRange* range, int64_t buffer_size) {
DCHECK_LE(buffer_size, max_buffer_size_);
DCHECK_GT(buffer_size, 0);
buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
int idx = free_buffers_idx(buffer_size);
// Quantize buffer size to nearest power of 2 greater than the specified buffer size and
// convert to bytes
buffer_size = (1LL << idx) * min_buffer_size_;
// Track memory against the reader. This is checked the next time we start
// a read for the next reader in DiskIoMgr::GetNextScanRange().
DCHECK(reader->mem_tracker_ != NULL);
uint8_t* buffer = NULL;
unique_lock<mutex> lock(free_buffers_lock_);
if (free_buffers_[idx].empty()) {
if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
// We already tracked this memory against the reader's MemTracker.
buffer = new uint8_t[buffer_size];
} else {
if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
buffer = free_buffers_[idx].front();
// Validate more invariants.
DCHECK(range != NULL);
DCHECK(reader != NULL);
DCHECK(buffer != NULL);
return GetBufferDesc(reader, reader->mem_tracker_, range, buffer, buffer_size);
void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
unique_lock<mutex> lock(free_buffers_lock_);
int buffers_freed = 0;
int bytes_freed = 0;
// Free small-to-large to avoid retaining many small buffers and fragmenting memory.
for (int idx = 0; idx < free_buffers_.size(); ++idx) {
std::list<uint8_t*>* free_buffers = &free_buffers_[idx];
while (
!free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= bytes_to_free)) {
uint8_t* buffer = free_buffers->front();
int64_t buffer_size = (1LL << idx) * min_buffer_size_;
delete[] buffer;
bytes_freed += buffer_size;
if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
uint8_t* buffer = desc->buffer_;
int64_t buffer_size = desc->buffer_len_;
int idx = free_buffers_idx(buffer_size);
DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
<< "buffer_size_ / min_buffer_size_ should be power of 2, got buffer_size = "
<< buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
unique_lock<mutex> lock(free_buffers_lock_);
if (!FLAGS_disable_mem_pools &&
free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != NULL) {
// This consume call needs to be protected by 'free_buffers_lock_' to avoid a race
// with a Release() call for the same buffer that could make consumption negative.
// Note: we can't use TryConsume(), which can indirectly call GcIoBuffers().
// TODO: after IMPALA-3200 is completed, we should be able to leverage the buffer
// pool's free lists, and remove these free lists.
} else {
delete[] buffer;
if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != NULL) {
if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != NULL) {
// We transferred the buffer ownership from the BufferDescriptor to the DiskIoMgr.
desc->buffer_ = NULL;
// This function gets the next RequestRange to work on for this disk. It checks for
// cancellation and
// a) Updates ready_to_start_ranges if there are no scan ranges queued for this disk.
// b) Adds an unstarted write range to in_flight_ranges_. The write range is processed
// immediately if there are no preceding scan ranges in in_flight_ranges_
// It blocks until work is available or the thread is shut down.
// Work is available if there is a DiskIoRequestContext with
// - A ScanRange with a buffer available, or
// - A WriteRange in unstarted_write_ranges_.
bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
DiskIoRequestContext** request_context) {
int disk_id = disk_queue->disk_id;
*range = NULL;
// This loops returns either with work to do or when the disk IoMgr shuts down.
while (true) {
*request_context = NULL;
DiskIoRequestContext::PerDiskState* request_disk_state = NULL;
unique_lock<mutex> disk_lock(disk_queue->lock);
while (!shut_down_ && disk_queue->request_contexts.empty()) {
// wait if there are no readers on the queue
if (shut_down_) break;
// Get the next reader and remove the reader so that another disk thread
// can't pick it up. It will be enqueued before issuing the read to HDFS
// so this is not a big deal (i.e. multiple disk threads can read for the
// same reader).
// TODO: revisit.
*request_context = disk_queue->request_contexts.front();
DCHECK(*request_context != NULL);
request_disk_state = &((*request_context)->disk_states_[disk_id]);
// NOTE: no locks were taken in between. We need to be careful about what state
// could have changed to the reader and disk in between.
// There are some invariants here. Only one disk thread can have the
// same reader here (the reader is removed from the queue). There can be
// other disk threads operating on this reader in other functions though.
// We just picked a reader. Before we may allocate a buffer on its behalf, check that
// it has not exceeded any memory limits (e.g. the query or process limit).
// TODO: once IMPALA-3200 is fixed, we should be able to remove the free lists and
// move these memory limit checks to GetFreeBuffer().
// Note that calling AnyLimitExceeded() can result in a call to GcIoBuffers().
// TODO: IMPALA-3209: we should not force a reader over its memory limit by
// pushing more buffers to it. Most readers can make progress and operate within
// a fixed memory limit.
if ((*request_context)->mem_tracker_ != NULL
&& (*request_context)->mem_tracker_->AnyLimitExceeded()) {
unique_lock<mutex> request_lock((*request_context)->lock_);
VLOG_FILE << "Disk (id=" << disk_id << ") reading for "
<< (*request_context)->DebugString();
// Check if reader has been cancelled
if ((*request_context)->state_ == DiskIoRequestContext::Cancelled) {
DCHECK_EQ((*request_context)->state_, DiskIoRequestContext::Active)
<< (*request_context)->DebugString();
if (request_disk_state->next_scan_range_to_start() == NULL &&
!request_disk_state->unstarted_scan_ranges()->empty()) {
// We don't have a range queued for this disk for what the caller should
// read next. Populate that. We want to have one range waiting to minimize
// wait time in GetNextRange.
ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue();
if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) {
// All the ranges have been started, notify everyone blocked on GetNextRange.
// Only one of them will get work so make sure to return NULL to the other
// caller threads.
} else {
// Always enqueue a WriteRange to be processed into in_flight_ranges_.
// This is done so in_flight_ranges_ does not exclusively contain ScanRanges.
// For now, enqueuing a WriteRange on each invocation of GetNextRequestRange()
// does not flood in_flight_ranges() with WriteRanges because the entire
// WriteRange is processed and removed from the queue after GetNextRequestRange()
// returns. (A DCHECK is used to ensure that writes do not exceed 8MB).
if (!request_disk_state->unstarted_write_ranges()->empty()) {
WriteRange* write_range = request_disk_state->unstarted_write_ranges()->Dequeue();
// Get the next scan range to work on from the reader. Only in_flight_ranges
// are eligible since the disk threads do not start new ranges on their own.
// There are no inflight ranges, nothing to do.
if (request_disk_state->in_flight_ranges()->empty()) {
DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
*range = request_disk_state->in_flight_ranges()->Dequeue();
DCHECK(*range != NULL);
// Now that we've picked a request range, put the context back on the queue so
// another thread can pick up another request range for this context.
request_disk_state->ScheduleContext(*request_context, disk_id);
DCHECK((*request_context)->Validate()) << endl << (*request_context)->DebugString();
return true;
return false;
void DiskIoMgr::HandleWriteFinished(
DiskIoRequestContext* writer, WriteRange* write_range, const Status& write_status) {
// Copy disk_id before running callback: the callback may modify write_range.
int disk_id = write_range->disk_id_;
// Execute the callback before decrementing the thread count. Otherwise CancelContext()
// that waits for the disk ref count to be 0 will return, creating a race, e.g.
// between BufferedBlockMgr::WriteComplete() and BufferedBlockMgr::~BufferedBlockMgr().
// See IMPALA-1890.
// The status of the write does not affect the status of the writer context.
unique_lock<mutex> writer_lock(writer->lock_);
DCHECK(writer->Validate()) << endl << writer->DebugString();
DiskIoRequestContext::PerDiskState& state = writer->disk_states_[disk_id];
if (writer->state_ == DiskIoRequestContext::Cancelled) {
} else {
void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext* reader,
BufferDescriptor* buffer) {
unique_lock<mutex> reader_lock(reader->lock_);
DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
DCHECK(reader->Validate()) << endl << reader->DebugString();
DCHECK_GT(state.num_threads_in_op(), 0);
DCHECK(buffer->buffer_ != NULL);
if (reader->state_ == DiskIoRequestContext::Cancelled) {
DCHECK(reader->Validate()) << endl << reader->DebugString();
if (!buffer->is_client_buffer()) FreeBufferMemory(buffer);
buffer->buffer_ = NULL;
// Enqueue the buffer to use the scan range's buffer cleanup path.
DCHECK_EQ(reader->state_, DiskIoRequestContext::Active);
DCHECK(buffer->buffer_ != NULL);
// Update the reader's scan ranges. There are a three cases here:
// 1. Read error
// 2. End of scan range
// 3. Middle of scan range
if (!buffer->status_.ok()) {
// Error case
if (!buffer->is_client_buffer()) FreeBufferMemory(buffer);
buffer->buffer_ = NULL;
buffer->eosr_ = true;
} else if (buffer->eosr_) {
// After calling EnqueueBuffer(), it is no longer valid to read from buffer.
// Store the state we need before calling EnqueueBuffer().
bool eosr = buffer->eosr_;
ScanRange* scan_range = buffer->scan_range_;
bool is_cached = buffer->is_cached();
bool queue_full = scan_range->EnqueueBuffer(buffer);
if (eosr) {
// For cached buffers, we can't close the range until the cached buffer is returned.
// Close() is called from DiskIoMgr::ReturnBuffer().
if (!is_cached) scan_range->Close();
} else {
if (queue_full) {
} else {
void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
// The thread waits until there is work or the entire system is being shut down.
// If there is work, performs the read or write requested and re-enqueues the
// requesting context.
// Locks are not taken when reading from or writing to disk.
// The main loop has three parts:
// 1. GetNextRequestContext(): get the next request context (read or write) to
// process and dequeue it.
// 2. For the dequeued request, gets the next scan- or write-range to process and
// re-enqueues the request.
// 3. Perform the read or write as specified.
// Cancellation checking needs to happen in both steps 1 and 3.
while (true) {
DiskIoRequestContext* worker_context = NULL;;
RequestRange* range = NULL;
if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
if (range->request_type() == RequestType::READ) {
ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
} else {
DCHECK(range->request_type() == RequestType::WRITE);
Write(worker_context, static_cast<WriteRange*>(range));
// This function reads the specified scan range associated with the
// specified reader context and disk queue.
void DiskIoMgr::ReadRange(DiskQueue* disk_queue, DiskIoRequestContext* reader,
ScanRange* range) {
int64_t bytes_remaining = range->len_ - range->bytes_read_;
DCHECK_GT(bytes_remaining, 0);
BufferDescriptor* buffer_desc = NULL;
if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
buffer_desc = GetBufferDesc(
reader, NULL, range, range->, range->client_buffer_.len);
} else {
// Need to allocate a buffer to read into.
int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_));
buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, buffer_size);
if (buffer_desc == NULL) return;
// No locks in this section. Only working on local vars. We don't want to hold a
// lock across the read call.
buffer_desc->status_ = range->Open();
if (buffer_desc->status_.ok()) {
// Update counters.
if (reader->active_read_thread_counter_) {
if (reader->disks_accessed_bitmap_) {
int64_t disk_bit = 1LL << disk_queue->disk_id;
buffer_desc->status_ = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
&buffer_desc->len_, &buffer_desc->eosr_);
buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
if (reader->bytes_read_counter_ != NULL) {
COUNTER_ADD(reader->bytes_read_counter_, buffer_desc->len_);
COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_);
if (reader->active_read_thread_counter_) {
// Finished read, update reader/disk based on the results
HandleReadFinished(disk_queue, reader, buffer_desc);
DiskIoMgr::BufferDescriptor* DiskIoMgr::TryAllocateNextBufferForRange(
DiskQueue* disk_queue, DiskIoRequestContext* reader, ScanRange* range,
int64_t buffer_size) {
DCHECK(reader->mem_tracker_ != NULL);
bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
if (!enough_memory) {
// Low memory, GC all the buffers and try again.
enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
if (!enough_memory) {
DiskIoRequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
unique_lock<mutex> reader_lock(reader->lock_);
// Just grabbed the reader lock, check for cancellation.
if (reader->state_ == DiskIoRequestContext::Cancelled) {
DCHECK(reader->Validate()) << endl << reader->DebugString();
DCHECK(reader->Validate()) << endl << reader->DebugString();
return NULL;
if (!range->ready_buffers_.empty()) {
// We have memory pressure and this range doesn't need another buffer
// (it already has one queued). Skip this range and pick it up later.
range->blocked_on_queue_ = true;
return NULL;
} else {
// We need to get a buffer anyway since there are none queued. The query
// is likely to fail due to mem limits but there's nothing we can do about that
// now.
BufferDescriptor* buffer_desc = GetFreeBuffer(reader, range, buffer_size);
DCHECK(buffer_desc != NULL);
return buffer_desc;
void DiskIoMgr::Write(DiskIoRequestContext* writer_context, WriteRange* write_range) {
Status ret_status = Status::OK();
FILE* file_handle = NULL;
// Raw open() syscall will create file if not present when passed these flags.
int fd = open(write_range->file(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR);
if (fd < 0) {
ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
Substitute("Opening '$0' for write failed with errno=$1 description=$2",
write_range->file_, errno, GetStrErrMsg())));
} else {
file_handle = fdopen(fd, "wb");
if (file_handle == NULL) {
ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
Substitute("fdopen($0, \"wb\") failed with errno=$1 description=$2", fd, errno,
if (file_handle != NULL) {
ret_status = WriteRangeHelper(file_handle, write_range);
int success = fclose(file_handle);
if (ret_status.ok() && success != 0) {
ret_status = Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
Substitute("fclose($0) failed", write_range->file_)));
HandleWriteFinished(writer_context, write_range, ret_status);
Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
// Seek to the correct offset and perform the write.
int success = fseek(file_handle, write_range->offset(), SEEK_SET);
if (success != 0) {
return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
Substitute("fseek($0, $1, SEEK_SET) failed with errno=$2 description=$3",
write_range->file_, write_range->offset(), errno, GetStrErrMsg())));
#ifndef NDEBUG
if (FLAGS_stress_scratch_write_delay_ms > 0) {
int64_t bytes_written = fwrite(write_range->data_, 1, write_range->len_, file_handle);
if (bytes_written < write_range->len_) {
return Status(ErrorMsg(TErrorCode::RUNTIME_ERROR,
Substitute("fwrite(buffer, 1, $0, $1) failed with errno=$2 description=$3",
write_range->len_, write_range->file_, errno, GetStrErrMsg())));
if (ImpaladMetrics::IO_MGR_BYTES_WRITTEN != NULL) {
return Status::OK();
int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_);
int idx = BitUtil::Log2Ceiling64(buffer_size_scaled);
DCHECK_GE(idx, 0);
DCHECK_LT(idx, free_buffers_.size());
return idx;
Status DiskIoMgr::AddWriteRange(DiskIoRequestContext* writer, WriteRange* write_range) {
unique_lock<mutex> writer_lock(writer->lock_);
if (writer->state_ == DiskIoRequestContext::Cancelled) {
return writer->status_;
writer->AddRequestRange(write_range, false);
return Status::OK();
int DiskIoMgr::AssignQueue(const char* file, int disk_id, bool expected_local) {
// If it's a remote range, check for an appropriate remote disk queue.
if (!expected_local) {
if (IsHdfsPath(file) && FLAGS_num_remote_hdfs_io_threads > 0) {
return RemoteDfsDiskId();
if (IsS3APath(file)) return RemoteS3DiskId();
if (IsADLSPath(file)) return RemoteAdlsDiskId();
// Assign to a local disk queue.
DCHECK(!IsS3APath(file)); // S3 is always remote.
DCHECK(!IsADLSPath(file)); // ADLS is always remote.
if (disk_id == -1) {
// disk id is unknown, assign it an arbitrary one.
disk_id = next_disk_id_.Add(1);
// TODO: we need to parse the config for the number of dirs configured for this
// data node.
return disk_id % num_local_disks();
DiskIoMgr::HdfsCachedFileHandle* DiskIoMgr::OpenHdfsFile(const hdfsFS& fs,
const char* fname, int64_t mtime) {
HdfsCachedFileHandle* fh = NULL;
// Check if a cached file handle exists and validate the mtime, if the mtime of the
// cached handle is not matching the mtime of the requested file, reopen.
if (detail::is_file_handle_caching_enabled() && file_handle_cache_.Pop(fname, &fh)) {
if (fh->mtime() == mtime) {
return fh;
VLOG_FILE << "mtime mismatch, closing cached file handle. Closing file=" << fname;
delete fh;
// Update cache hit ratio
fh = new HdfsCachedFileHandle(fs, fname, mtime);
// Check if the file handle was opened correctly
if (!fh->ok()) {
VLOG_FILE << "Opening the file " << fname << " failed.";
delete fh;
return NULL;
return fh;
void DiskIoMgr::CacheOrCloseFileHandle(const char* fname,
DiskIoMgr::HdfsCachedFileHandle* fid, bool close) {
// Try to unbuffer the handle, on filesystems that do not support this call a non-zero
// return code indicates that the operation was not successful and thus the file is
// closed.
if (detail::is_file_handle_caching_enabled() &&
!close && hdfsUnbufferFile(fid->file()) == 0) {
// Clear read statistics before returning
file_handle_cache_.Put(fname, fid);
} else {
if (close) {
VLOG_FILE << "Closing file=" << fname;
} else {
VLOG_FILE << "FS does not support file handle unbuffering, closing file="
<< fname;
delete fid;