| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_RUNTIME_IO_DISK_IO_MGR_H |
| #define IMPALA_RUNTIME_IO_DISK_IO_MGR_H |
| |
| #include <vector> |
| |
| #include <boost/thread/mutex.hpp> |
| |
| #include "common/atomic.h" |
| #include "common/hdfs.h" |
| #include "common/status.h" |
| #include "runtime/bufferpool/buffer-pool.h" |
| #include "runtime/io/handle-cache.h" |
| #include "runtime/io/hdfs-monitored-ops.h" |
| #include "runtime/io/local-file-system.h" |
| #include "runtime/io/request-ranges.h" |
| #include "util/aligned-new.h" |
| #include "util/runtime-profile.h" |
| #include "util/thread.h" |
| |
| namespace impala { |
| |
| namespace io { |
| |
| class DataCache; |
| class DiskQueue; |
| |
| /// Manager object that schedules IO for all queries on all disks and remote filesystems |
| /// (such as S3). Each query maps to one or more RequestContext objects, each of which |
| /// has its own queue of scan ranges and/or write ranges. |
| /// |
| /// The API splits up requesting scan/write ranges (non-blocking) and reading the data |
| /// (blocking). The DiskIoMgr has worker threads that will read from and write to |
| /// disk/hdfs/remote-filesystems, allowing interleaving of IO and CPU. This allows us to |
| /// keep all disks and all cores as busy as possible. |
| /// |
| /// All public APIs are thread-safe. It is not valid to call any of the APIs after |
| /// UnregisterContext() returns. |
| /// |
| /// For Readers: |
| /// We can model this problem as a multiple producer (threads for each disk), multiple |
| /// consumer (scan ranges) problem. There are multiple queues that need to be |
| /// synchronized. Conceptually, there are two queues: |
| /// 1. The per disk queue: this contains a queue of readers that need reads. |
| /// 2. The per scan range ready-buffer queue: this contains buffers that have been |
| /// read and are ready for the caller. |
| /// The disk queue contains a queue of readers and is scheduled in a round robin fashion. |
| /// Readers map to scan nodes. The reader then contains a queue of scan ranges. The caller |
| /// asks the IoMgr for the next range to process. The IoMgr then selects the best range |
| /// to read based on disk activity and begins reading and queuing buffers for that range. |
| /// |
| /// For Writers: |
| /// Data is written via RequestContext::AddWriteRange(). This is non-blocking and adds |
| /// a WriteRange to a per-disk queue. After the write is complete, a callback in |
| /// WriteRange is invoked. No memory is allocated within IoMgr for writes and no copies |
| /// are made. It is the responsibility of the client to ensure that the data to be |
| /// written is valid. The file to be written is created if not already present. |
| /// |
| /// There are several key methods for scanning data with the IoMgr. |
| /// 1. RequestContext::StartScanRange(): adds range to the IoMgr to start immediately. |
| /// 2. RequestContext::AddScanRanges(): adds ranges to the IoMgr that the reader wants to |
| /// scan, but does not start them until RequestContext::GetNextUnstartedRange() is called. |
| /// 3. RequestContext::GetNextUnstartedRange(): returns to the caller the next scan range |
| /// it should process. |
| /// 4. ScanRange::GetNext(): returns the next buffer for this range, blocking until |
| /// data is available. |
| /// |
| /// The disk threads do not synchronize with each other. The readers and writers don't |
| /// synchronize with each other. There is a lock and condition variable for each request |
| /// context queue and each disk queue. |
| /// IMPORTANT: whenever both locks are needed, the lock order is to grab the context lock |
| /// before the disk lock. |
| /// |
| /// Scheduling: If there are multiple request contexts with work for a single disk, the |
| /// request contexts are scheduled in round-robin order. Multiple disk threads can |
| /// operate on the same request context. Exactly one request range is processed by a |
| /// disk thread at a time. If there are multiple scan ranges scheduled for a single |
| /// context, these are processed in round-robin order. |
| /// If there are multiple scan and write ranges for a disk, a read is always followed |
| /// by a write, and a write is followed by a read, i.e. reads and writes alternate. |
| /// If multiple write ranges are enqueued for a single disk, they will be processed |
| /// by the disk threads in order, but may complete in any order. No guarantees are made |
| /// on ordering of writes across disks. |
| /// |
| /// Resource Management: the IoMgr is designed to share the available disk I/O capacity |
| /// between many clients and to help use the available I/O capacity efficiently. The IoMgr |
| /// interfaces are designed to let clients manage their own CPU and memory usage while the |
| /// IoMgr manages the allocation of the I/O capacity of different I/O devices to scan |
| /// ranges of different clients. |
| /// |
| /// IoMgr clients may want to work on multiple scan ranges at a time to maximize CPU and |
| /// I/O utilization. Clients can call RequestContext::GetNextUnstartedRange() to start as |
| /// many concurrent scan ranges as required, e.g. from each parallel scanner thread. Once |
| /// a scan range has been returned via GetNextUnstartedRange(), the caller must allocate |
| /// any memory needed for buffering reads, after which the IoMgr wil start to fill the |
| /// buffers with data while the caller concurrently consumes and processes the data. For |
| /// example, the logic in a scanner thread might look like: |
| /// while (more_ranges) |
| /// range = context->GetNextUnstartedRange() |
| /// while (!range.eosr) |
| /// buffer = range.GetNext() |
| /// |
| /// Note that the IoMgr rather than the client is responsible for choosing which scan |
| /// range to process next, which allows optimizations like distributing load across disks. |
| /// |
| /// Buffer Management: |
| /// Buffers for reads are either a) allocated on behalf of the caller with |
| /// AllocateBuffersForRange() ("IoMgr-allocated"), b) cached HDFS buffers if the scan |
| /// range was read from the HDFS cache, or c) a client buffer, large enough to fit the |
| /// whole scan range's data, that is provided by the caller when constructing the |
| /// scan range. |
| /// |
| /// All three kinds of buffers are wrapped in BufferDescriptors before returning to the |
| /// caller. The caller must always call ReturnBuffer() on the buffer descriptor to allow |
| /// recycling of the buffer memory and to release any resources associated with the buffer |
| /// or scan range. |
| /// |
| /// In case a), ReturnBuffer() may re-enqueue the buffer for GetNext() to return again if |
| /// needed. E.g. if 24MB of buffers were allocated to read a 64MB scan range, each buffer |
| /// must be returned multiple times. Callers must be careful to call ReturnBuffer() with |
| /// the previous buffer returned from the range before calling before GetNext() so that |
| /// at least one buffer is available for the I/O mgr to read data into. Calling GetNext() |
| /// when the scan range has no buffers to read data into causes a resource deadlock. |
| /// NB: if the scan range was allocated N buffers, then it's always ok for the caller |
| /// to hold onto N - 1 buffers, but currently the IoMgr doesn't give the caller a way |
| /// to determine the value of N. |
| /// |
| /// If the caller wants to maximize I/O throughput, it can give the range enough memory |
| /// for 3 max-sized buffers per scan range. Having two queued buffers (plus the buffer |
| /// that is currently being processed by the client) gives good performance in most |
| /// scenarios: |
| /// 1. If the consumer is consuming data faster than we can read from disk, then the |
| /// queue will be empty most of the time because the buffer will be immediately |
| /// pulled off the queue as soon as it is added. There will always be an I/O request |
| /// in the disk queue to maximize I/O throughput, which is the bottleneck in this |
| /// case. |
| /// 2. If we can read from disk faster than the consumer is consuming data, the queue |
| /// will fill up and there will always be a buffer available for the consumer to |
| /// read, so the consumer will not block and we maximize consumer throughput, which |
| /// is the bottleneck in this case. |
| /// 3. If the consumer is consuming data at approximately the same rate as we are |
| /// reading from disk, then the steady state is that the consumer is processing one |
| /// buffer and one buffer is in the disk queue. The additional buffer can absorb |
| /// bursts where the producer runs faster than the consumer or the consumer runs |
| /// faster than the producer without blocking either the producer or consumer. |
| /// See IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE. |
| /// |
| /// Caching support: |
| /// Scan ranges contain metadata on whether or not it is cached on the DN. In that |
| /// case, we use the HDFS APIs to read the cached data without doing any copies. For these |
| /// ranges, the reads happen on the caller thread (as opposed to the disk threads). |
| /// It is possible for the cached read APIs to fail, in which case the ranges are then |
| /// queued on the disk threads and behave identically to the case where the range |
| /// is not cached. |
| /// Resources for these ranges are also not accounted against the reader because none |
| /// are consumed. |
| /// While a cached block is being processed, the block is mlocked. We want to minimize |
| /// the time the mlock is held. |
| /// - HDFS will time us out if we hold onto the mlock for too long |
| /// - Holding the lock prevents uncaching this file due to a caching policy change. |
| /// Therefore, we only issue the cached read when the caller is ready to process the |
| /// range (GetNextUnstartedRange()) instead of when the ranges are issued. This guarantees |
| /// that there will be a CPU available to process the buffer and any throttling we do with |
| /// the number of scanner threads properly controls the amount of files we mlock. |
| /// With cached scan ranges, we cannot close the scan range until the cached buffer |
| /// is returned (HDFS does not allow this). We therefore need to defer the close until |
| /// the cached buffer is returned (ReturnBuffer()). |
| /// |
| /// Remote filesystem support (e.g. S3): |
| /// Remote filesystems are modeled as "remote disks". That is, there is a seperate disk |
| /// queue for each supported remote filesystem type. In order to maximize throughput, |
| /// multiple connections are opened in parallel by having multiple threads running per |
| /// queue. Also note that reading from a remote filesystem service can be more CPU |
| /// intensive than local disk/hdfs because of non-direct I/O and SSL processing, and can |
| /// be CPU bottlenecked especially if not enough I/O threads for these queues are |
| /// started. |
| /// |
| /// Remote filesystem data caching: |
| /// To reduce latency and avoid being network bound when reading from remote filesystems, |
| /// a data cache can be optionally enabled (via --data_cache_config) for caching data read |
| /// for remote scan ranges on local storage. The cache is independent of file formats. |
| /// It's merely caching chunks of file blocks directly on local storage to avoid |
| /// fetching them over network. Please see data-cache.h for details. |
| /// |
| /// TODO: We should implement more sophisticated resource management. Currently readers |
| /// are the unit of scheduling and we attempt to distribute IOPS between them. Instead |
| /// it would be better to have policies based on queries, resource pools, etc. |
| /// TODO: IoMgr should be able to request additional scan ranges from the coordinator |
| /// to help deal with stragglers. |
| /// |
| /// Structure of the Implementation: |
| /// - All client APIs are defined in this file, request-ranges.h and request-context.h. |
| /// Clients can include only the files that they need. |
| /// - Some internal classes are defined in disk-io-mgr-internal.h |
| /// - ScanRange APIs are implemented in scan-range.cc |
| /// This contains the ready buffer queue logic |
| /// - RequestContext APIs are implemented in request-context.cc |
| /// This contains the logic for picking scan ranges for a reader. |
| /// - Disk Thread and general APIs are implemented in disk-io-mgr.cc. |
| /// - The handle cache is implemented in handle-cache{.inline,}.h |
| |
| // This is cache line aligned because the FileHandleCache needs cache line alignment |
| // for its partitions. |
| class DiskIoMgr : public CacheLineAligned { |
| public: |
| /// Create a DiskIoMgr object. This constructor is only used for testing. |
| /// - num_disks: The number of disks the IoMgr should use. This is used for testing. |
| /// Specify 0, to have the disk IoMgr query the os for the number of disks. |
| /// - threads_per_rotational_disk: number of read threads to create per rotational |
| /// disk. This is also the max queue depth. |
| /// - threads_per_solid_state_disk: number of read threads to create per solid state |
| /// disk. This is also the max queue depth. |
| /// - min_buffer_size: minimum io buffer size (in bytes). Will be rounded down to the |
| // nearest power-of-two. |
| /// - max_buffer_size: maximum io buffer size (in bytes). Will be rounded up to the |
| /// nearest power-of-two. Also the max read size. |
| DiskIoMgr(int num_disks, int threads_per_rotational_disk, |
| int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size); |
| |
| /// Create DiskIoMgr with default configs. |
| DiskIoMgr(); |
| |
| /// Clean up all threads and resources. This is mostly useful for testing since |
| /// for impalad, this object is never destroyed. |
| virtual ~DiskIoMgr(); |
| |
| /// Initialize the IoMgr. Must be called once before any of the other APIs. |
| Status Init() WARN_UNUSED_RESULT; |
| |
| |
| /// Allocates tracking structure for a request context. |
| /// Register a new request context and return it to the caller. The caller must call |
| /// UnregisterContext() for each context. |
| std::unique_ptr<RequestContext> RegisterContext(); |
| |
| /// Unregisters context from the disk IoMgr by first cancelling it then blocking until |
| /// all references to the context are removed from I/O manager internal data structures. |
| /// This must be called for every RegisterContext() to ensure that the context object |
| /// can be safely destroyed. It is invalid to add more request ranges to 'context' after |
| /// after this call. This call blocks until all the disk threads have finished cleaning |
| /// up. |
| void UnregisterContext(RequestContext* context); |
| |
| /// Allocates up to 'max_bytes' buffers to read the data from 'range' into and schedules |
| /// the range. Called after StartScanRange() or reader->GetNextUnstartedRange() |
| /// returns *needs_buffers=true. |
| /// |
| /// The buffer sizes are chosen based on range->len(). 'max_bytes' must be >= |
| /// min_read_buffer_size() so that at least one buffer can be allocated. The caller |
| /// must ensure that 'bp_client' has at least 'max_bytes' unused reservation. Returns ok |
| /// if the buffers were successfully allocated and the range was scheduled. |
| /// |
| /// Setting 'max_bytes' to IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE * max_buffer_size() |
| /// will typically maximize I/O throughput. See the "Buffer Management" section of |
| /// the class comment for explanation. |
| Status AllocateBuffersForRange( |
| BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes); |
| |
| /// Determine which disk queue this file should be assigned to. Returns an index into |
| /// disk_queues_. The disk_id is the volume ID for the local disk that holds the |
| /// files, or -1 if unknown. Flag expected_local is true iff this impalad is |
| /// co-located with the datanode for this file. |
| int AssignQueue(const char* file, int disk_id, bool expected_local); |
| |
| int64_t min_buffer_size() const { return min_buffer_size_; } |
| int64_t max_buffer_size() const { return max_buffer_size_; } |
| |
| /// Returns the total number of disk queues (both local and remote). |
| int num_total_disks() const { return disk_queues_.size(); } |
| |
| /// Returns the total number of remote "disk" queues. |
| int num_remote_disks() const { return REMOTE_NUM_DISKS; } |
| |
| /// Returns the number of local disks attached to the system. |
| int num_local_disks() const { return num_total_disks() - num_remote_disks(); } |
| |
| /// The disk ID (and therefore disk_queues_ index) used for DFS accesses. |
| int RemoteDfsDiskId() const { return num_local_disks() + REMOTE_DFS_DISK_OFFSET; } |
| |
| /// The disk ID (and therefore disk_queues_ index) used for S3 accesses. |
| int RemoteS3DiskId() const { return num_local_disks() + REMOTE_S3_DISK_OFFSET; } |
| |
| /// The disk ID (and therefore disk_queues_ index) used for ABFS accesses. |
| int RemoteAbfsDiskId() const { return num_local_disks() + REMOTE_ABFS_DISK_OFFSET; } |
| |
| /// The disk ID (and therefore disk_queues_ index) used for ADLS accesses. |
| int RemoteAdlsDiskId() const { return num_local_disks() + REMOTE_ADLS_DISK_OFFSET; } |
| |
| /// Dumps the disk IoMgr queues (for readers and disks) |
| std::string DebugString(); |
| |
| /// Validates the internal state is consistent. This is intended to only be used |
| /// for debugging. |
| bool Validate() const; |
| |
| /// Given a FS handle, name and last modified time of the file, construct a new |
| /// ExclusiveHdfsFileHandle and return it via 'fid'. This records the time spent |
| /// opening the handle in 'reader' and counts this as a cache miss. In case of an |
| /// error, returns status and 'fid' is untouched. |
| Status GetExclusiveHdfsFileHandle(const hdfsFS& fs, |
| std::string* fname, int64_t mtime, RequestContext* reader, |
| std::unique_ptr<ExclusiveHdfsFileHandle>& fid) WARN_UNUSED_RESULT; |
| |
| /// Releases an exclusive file handle, destroying it |
| void ReleaseExclusiveHdfsFileHandle(std::unique_ptr<ExclusiveHdfsFileHandle> fid); |
| |
| /// Given a FS handle, name and last modified time of the file, gets a |
| /// CachedHdfsFileHandle from the file handle cache and returns it via 'fid'. |
| /// Records the time spent opening the handle in 'reader'. On success, records |
| /// statistics about whether this was a cache hit or miss in the 'reader' as well as |
| /// at the system level. In case of an error, returns status and 'fid' is untouched. |
| Status GetCachedHdfsFileHandle(const hdfsFS& fs, |
| std::string* fname, int64_t mtime, RequestContext* reader, |
| CachedHdfsFileHandle** fid) WARN_UNUSED_RESULT; |
| |
| /// Releases a file handle back to the file handle cache when it is no longer in use. |
| void ReleaseCachedHdfsFileHandle(std::string* fname, CachedHdfsFileHandle* fid); |
| |
| /// Reopens a file handle by destroying the file handle and getting a fresh |
| /// file handle from the cache. Records the time spent reopening the handle |
| /// in 'reader'. Returns an error if the file could not be reopened. |
| Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime, |
| RequestContext* reader, CachedHdfsFileHandle** fid) WARN_UNUSED_RESULT; |
| |
| // Function to change the underlying LocalFileSystem object used for disk I/O. |
| // DiskIoMgr will also take responsibility of the received LocalFileSystem pointer. |
| // It is only for testing purposes to use a fault injected version of LocalFileSystem. |
| void SetLocalFileSystem(std::unique_ptr<LocalFileSystem> fs) { |
| local_file_system_ = std::move(fs); |
| } |
| |
| /// "Disk" queue offsets for remote accesses. Offset 0 corresponds to |
| /// disk ID (i.e. disk_queue_ index) of num_local_disks(). |
| enum { |
| REMOTE_DFS_DISK_OFFSET = 0, |
| REMOTE_S3_DISK_OFFSET, |
| REMOTE_ADLS_DISK_OFFSET, |
| REMOTE_ABFS_DISK_OFFSET, |
| REMOTE_NUM_DISKS |
| }; |
| |
| /// Compute the ideal reservation for processing a scan range of 'scan_range_len' bytes. |
| /// See "Buffer Management" in the class comment for explanation. |
| int64_t ComputeIdealBufferReservation(int64_t scan_range_len); |
| |
| /// The ideal number of max-sized buffers per scan range to maximise throughput. |
| /// See "Buffer Management" in the class comment for explanation. |
| static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3; |
| |
| /// Validates that range is correctly initialized. Return an error status if there |
| /// is something invalid about the scan range. |
| Status ValidateScanRange(ScanRange* range) WARN_UNUSED_RESULT; |
| |
| DataCache* remote_data_cache() { return remote_data_cache_.get(); } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(DiskIoMgr); |
| friend class DiskIoMgrTest_Buffers_Test; |
| friend class DiskIoMgrTest_BufferSizeSelection_Test; |
| friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test; |
| |
| ///////////////////////////////////////// |
| /// BEGIN: private members that are accessed by other io:: classes |
| friend class DiskQueue; |
| friend class ScanRange; |
| friend class HdfsFileReader; |
| |
| /// Write the specified range to disk and calls writer_context->WriteDone() when done. |
| /// Responsible for opening and closing the file that is written. |
| void Write(RequestContext* writer_context, WriteRange* write_range); |
| |
| struct hadoopRzOptions* cached_read_options() { return cached_read_options_; } |
| |
| /// END: private members that are accessed by other io:: classes |
| ///////////////////////////////////////// |
| |
| // Handles the low level I/O functionality. |
| std::unique_ptr<LocalFileSystem> local_file_system_; |
| |
| /// Number of worker(read) threads per rotational disk. Also the max depth of queued |
| /// work to the disk. |
| const int num_io_threads_per_rotational_disk_; |
| |
| /// Number of worker(read) threads per solid state disk. Also the max depth of queued |
| /// work to the disk. |
| const int num_io_threads_per_solid_state_disk_; |
| |
| /// Maximum read size. This is also the maximum size of each allocated buffer. |
| const int64_t max_buffer_size_; |
| |
| /// The minimum size of each read buffer. Must be >= BufferPool::min_buffer_len(). |
| const int64_t min_buffer_size_; |
| |
| /// Thread group containing all the worker threads. |
| ThreadGroup disk_thread_group_; |
| |
| /// Options object for cached hdfs reads. Set on startup and never modified. |
| struct hadoopRzOptions* cached_read_options_ = nullptr; |
| |
| /// Per disk queues. This is static and created once at Init() time. One queue is |
| /// allocated for each local disk on the system and for each remote filesystem type. |
| /// It is indexed by disk id. |
| std::vector<DiskQueue*> disk_queues_; |
| |
| /// The next disk queue to write to if the actual 'disk_id_' is unknown (i.e. the file |
| /// is not associated with a particular local disk or remote queue). Used to implement |
| /// round-robin assignment for that case. |
| static AtomicInt32 next_disk_id_; |
| |
| /// Thread pool used to implement timeouts for HDFS operations. |
| HdfsMonitor hdfs_monitor_; |
| |
| // Caching structure that maps file names to cached file handles. The cache has an upper |
| // limit of entries defined by FLAGS_max_cached_file_handles. Evicted cached file |
| // handles are closed. |
| FileHandleCache file_handle_cache_; |
| |
| /// Helper method to write a range using the specified FILE handle. Returns Status:OK |
| /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise. |
| /// Does not open or close the file that is written. |
| Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT; |
| |
| /// Helper for AllocateBuffersForRange() to compute the buffer sizes for a scan range |
| /// with length 'scan_range_len', given that 'max_bytes' of memory should be allocated. |
| std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes); |
| |
| /// Singleton IO data cache for remote reads. If configured, it will be probed for all |
| /// non-local reads and data read from remote data nodes will be stored in it. If not |
| /// configured, this would be NULL. |
| std::unique_ptr<DataCache> remote_data_cache_; |
| }; |
| } |
| } |
| |
| #endif |