| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <cstdint> |
| #include <deque> |
| #include <functional> |
| #include <mutex> |
| |
| #include <boost/thread/shared_mutex.hpp> |
| #include <gtest/gtest_prod.h> // for FRIEND_TEST |
| |
| #include "common/atomic.h" |
| #include "common/hdfs.h" |
| #include "common/status.h" |
| #include "runtime/bufferpool/buffer-pool.h" |
| #include "runtime/io/disk-file.h" |
| #include "util/condition-variable.h" |
| #include "util/internal-queue.h" |
| |
| namespace impala { |
| namespace io { |
| class DiskIoMgr; |
| class DiskQueue; |
| class ExclusiveHdfsFileHandle; |
| class FileReader; |
| class FileWriter; |
| class RequestContext; |
| class ScanRange; |
| |
| /// Buffer struct that is used by the caller and IoMgr to pass read buffers. |
| /// It is is expected that only one thread has ownership of this object at a |
| /// time. |
| class BufferDescriptor { |
| public: |
| /// Create a buffer descriptor allocated from the buffer pool. Public to |
| /// allow access by DiskIoMgr. |
| BufferDescriptor(ScanRange* scan_range, BufferPool::ClientHandle* bp_client, |
| BufferPool::BufferHandle handle); |
| |
| ~BufferDescriptor() { |
| DCHECK(buffer_ == nullptr); // Check we didn't leak a buffer. |
| } |
| |
| ScanRange* scan_range() { return scan_range_; } |
| uint8_t* buffer() { return buffer_; } |
| int64_t buffer_len() { return buffer_len_; } |
| int64_t len() { return len_; } |
| bool eosr() { return eosr_; } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(BufferDescriptor); |
| /// This class is tightly coupled with ScanRange. Making them friends is easiest. |
| friend class ScanRange; |
| friend class HdfsFileReader; |
| |
| /// Create a buffer descriptor for a range and data buffer. |
| BufferDescriptor(ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len); |
| |
| /// Return true if this is a cached buffer owned by HDFS. |
| bool is_cached() const; |
| |
| /// Return true if this is a buffer owner by the client that was provided when |
| /// constructing the scan range. |
| bool is_client_buffer() const; |
| |
| /// Releases memory resources for this buffer. If the buffer was allocated with |
| /// DiskIoMgr::AllocateBuffersForRange(), frees the buffer. Otherwise (e.g. a client |
| /// or HDFS cache buffer), just prepares this descriptor to be destroyed. After this |
| /// is called, buffer() is NULL. Does not acquire 'lock_'. |
| void Free(); |
| |
| /// Scan range that this buffer is for. Non-NULL when initialised. |
| ScanRange* const scan_range_; |
| |
| /// Buffer for the read contents. Must be set to NULL in Free() before destruction |
| /// of a descriptor. |
| uint8_t* buffer_; |
| |
| /// length of buffer_. For buffers from cached reads, the length is 0. |
| const int64_t buffer_len_; |
| |
| /// length of read contents |
| int64_t len_ = 0; |
| |
| /// true if the current scan range is complete |
| bool eosr_ = false; |
| |
| // Handle to an allocated buffer and the client used to allocate it buffer. Only used |
| // for non-external buffers. |
| BufferPool::ClientHandle* bp_client_ = nullptr; |
| BufferPool::BufferHandle handle_; |
| }; |
| |
| /// The request type, read or write associated with a request range. |
| /// Ohter than those, request type file_upload is the type for remote file operation |
| /// ranges, for doing file uploading to the remote. |
| struct RequestType { |
| enum type { |
| READ, |
| WRITE, |
| FILE_UPLOAD, |
| }; |
| }; |
| |
| /// ReadOutput describes the possible outcomes of the DoRead() function. |
| enum class ReadOutcome { |
| // The last (eosr) buffer was successfully enqueued. |
| SUCCESS_EOSR, |
| // The buffer was successfully enqueued but we are not at eosr and can schedule |
| // the next read. |
| SUCCESS_NO_EOSR, |
| // The scan range is blocked waiting for the next buffer. |
| BLOCKED_ON_BUFFER, |
| // The scan range is cancelled (either by caller or because of an error). No more |
| // reads will be scheduled. |
| CANCELLED |
| }; |
| |
| /// Represents a contiguous sequence of bytes in a single file. |
| /// This is the common base class for read and write IO requests - ScanRange and |
| /// WriteRange. Each disk thread processes exactly one RequestRange at a time. |
| class RequestRange : public InternalQueue<RequestRange>::Node { |
| public: |
| hdfsFS fs() const { return fs_; } |
| const char* file() const { return file_.c_str(); } |
| std::string* file_string() { return &file_; } |
| int64_t offset() const { return offset_; } |
| int64_t len() const { return len_; } |
| int disk_id() const { return disk_id_; } |
| RequestType::type request_type() const { return request_type_; } |
| |
| protected: |
| RequestRange(RequestType::type request_type, int disk_id = -1) |
| : fs_(nullptr), |
| offset_(-1), |
| len_(-1), |
| disk_id_(disk_id), |
| request_type_(request_type) {} |
| |
| /// Hadoop filesystem that contains file_, or set to nullptr for local filesystem. |
| hdfsFS fs_; |
| |
| /// Path to file being read or written. |
| std::string file_; |
| |
| /// Offset within file_ being read or written. |
| int64_t offset_; |
| |
| /// Length of data read or written. |
| int64_t len_; |
| |
| /// Id of disk queue containing byte range. |
| int disk_id_; |
| |
| /// The type of IO request, READ or WRITE. |
| RequestType::type request_type_; |
| }; |
| |
| /// Param struct for different combinations of buffering. |
| struct BufferOpts { |
| public: |
| |
| /// Different caching options available for a scan range. |
| /// |
| /// If USE_HDFS_CACHE is set, a read will first be probed against the HDFS cache for |
| /// any hits. If there is a miss, it will fall back to reading from the underlying |
| /// storage. Please note that HDFS cache are only used for local reads. Reads from |
| /// remote locations (e.g. another HDFS data node) will not be cached in the HDFS cache. |
| /// |
| /// If USE_DATA_CACHE is set, any read from the underlying storage will first be probed |
| /// against the data cache. If there is a cache miss in data cache, data will be |
| /// inserted into the data cache upon IO completion. The data cache is usually used for |
| /// caching non-local HDFS data (e.g. remote HDFS data or S3). |
| enum { |
| NO_CACHING = 0, |
| USE_HDFS_CACHE = 1 << 0, |
| USE_DATA_CACHE = 1 << 2 |
| }; |
| |
| /// Set options for a read into an IoMgr-allocated or HDFS-cached buffer. |
| /// 'cache_options' specifies the caching options used. Please see comments |
| /// of 'USE_HDFS_CACHE' and 'USE_DATA_CACHE' for details of the caching options. |
| BufferOpts(int cache_options) |
| : cache_options_(cache_options), |
| client_buffer_(nullptr), |
| client_buffer_len_(-1) {} |
| |
| /// Set options for an uncached read into an IoMgr-allocated buffer. |
| static BufferOpts Uncached() { |
| return BufferOpts(NO_CACHING, nullptr, -1); |
| } |
| |
| /// Set options to read the entire scan range into 'client_buffer'. The length of the |
| /// buffer, 'client_buffer_len', must fit the entire scan range. HDFS caching shouldn't |
| /// be enabled in this case. |
| static BufferOpts ReadInto(uint8_t* client_buffer, int64_t client_buffer_len, |
| int cache_options) { |
| DCHECK_EQ(cache_options & USE_HDFS_CACHE, 0); |
| return BufferOpts(cache_options, client_buffer, client_buffer_len); |
| } |
| |
| /// Use only when you don't want to to read the entire scan range, but only sub-ranges |
| /// in it. In this case you can copy the relevant parts from the HDFS cache into the |
| /// client buffer. The length of the buffer, 'client_buffer_len' must fit the |
| /// concatenation of all the sub-ranges. |
| static BufferOpts ReadInto(int cache_options, uint8_t* client_buffer, |
| int64_t client_buffer_len) { |
| return BufferOpts(cache_options, client_buffer, client_buffer_len); |
| } |
| |
| private: |
| friend class ScanRange; |
| friend class HdfsFileReader; |
| FRIEND_TEST(DataCacheTest, TestBasics); |
| |
| BufferOpts(int cache_options, uint8_t* client_buffer, |
| int64_t client_buffer_len) |
| : cache_options_(cache_options), |
| client_buffer_(client_buffer), |
| client_buffer_len_(client_buffer_len) {} |
| |
| /// Specify options to enable HDFS and data caches. |
| const int cache_options_; |
| |
| /// A destination buffer provided by the client, nullptr and -1 if no buffer. |
| uint8_t* const client_buffer_; |
| const int64_t client_buffer_len_; |
| }; |
| |
| /// ScanRange description. The caller must call Reset() to initialize the fields |
| /// before calling AddScanRanges(). The private fields are used internally by |
| /// the IoMgr. |
| class ScanRange : public RequestRange { |
| public: |
| ScanRange(); |
| |
| virtual ~ScanRange(); |
| |
| /// Defines an internal range within this ScanRange. |
| struct SubRange { |
| int64_t offset; |
| int64_t length; |
| }; |
| |
| /// Allocate a scan range object stored in the given 'obj_pool' and calls Reset() on it |
| /// with the rest of the input variables. |
| static ScanRange* AllocateScanRange(ObjectPool* obj_pool, hdfsFS fs, const char* file, |
| int64_t len, int64_t offset, std::vector<SubRange>&& sub_ranges, void* metadata, |
| int disk_id, bool expected_local, int64_t mtime, const BufferOpts& buffer_opts); |
| |
| /// Resets this scan range object with the scan range description. The scan range |
| /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the |
| /// local filesystem). The scan range must be non-empty and fall within the file bounds |
| /// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk |
| /// queue to add the range to. If 'expected_local' is true, a warning is generated if |
| /// the read did not come from a local disk. 'mtime' is the last modification time for |
| /// 'file'; the mtime must change when the file changes. |
| /// 'buffer_opts' specifies buffer management options - see the DiskIoMgr class comment |
| /// and the BufferOpts comments for details. |
| /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data. |
| /// 'disk_file' and 'disk_buffer_file' provides methods to confirm and guarantee the |
| /// status of the physical file is available to access for the scanning. They are used |
| /// for scanning a file related to spilling to a remote filesystem, both should be |
| /// non-null, for other cases, like scanning a file related to spilling to local |
| /// filesytem, both would not be used and should be null. |
| /// TODO: IMPALA-4249: clarify if a ScanRange can be reused after Reset(). Currently |
| /// it is not generally safe to do so, but some unit tests reuse ranges after |
| /// successfully reading to eos. |
| void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, |
| bool expected_local, int64_t mtime, const BufferOpts& buffer_opts, |
| void* meta_data = nullptr, DiskFile* disk_file = nullptr, |
| DiskFile* disk_buffer_file = nullptr); |
| |
| /// Same as above, but it also adds sub-ranges. No need to merge contiguous sub-ranges |
| /// in advance, as this method will do the merge. |
| void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id, |
| bool expected_local, int64_t mtime, const BufferOpts& buffer_opts, |
| std::vector<SubRange>&& sub_ranges, void* meta_data = nullptr, |
| DiskFile* disk_file = nullptr, DiskFile* disk_buffer_file = nullptr); |
| |
| void* meta_data() const { return meta_data_; } |
| int cache_options() const { return cache_options_; } |
| bool UseHdfsCache() const { return (cache_options_ & BufferOpts::USE_HDFS_CACHE) != 0; } |
| bool UseDataCache() const { return (cache_options_ & BufferOpts::USE_DATA_CACHE) != 0; } |
| bool read_in_flight() const { return read_in_flight_; } |
| bool expected_local() const { return expected_local_; } |
| int64_t bytes_to_read() const { return bytes_to_read_; } |
| bool use_local_buffer() const { return use_local_buffer_; } |
| |
| /// Returns the next buffer for this scan range. buffer is an output parameter. |
| /// This function blocks until a buffer is ready or an error occurred. If this is |
| /// called when all buffers have been returned, *buffer is set to nullptr and Status::OK |
| /// is returned. If this returns buffer->eos() or an error status, then all buffers |
| /// owned by the scan range were either returned to callers of GetNext() or freed. |
| /// Only one thread can be in GetNext() at any time. |
| Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT; |
| |
| /// Returns the buffer to the scan range. This must be called for every buffer |
| /// returned by GetNext(). After calling this, the buffer descriptor is invalid |
| /// and cannot be accessed. |
| void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer); |
| |
| /// Cancel this scan range. This waits for any in-flight read operations to complete, |
| /// cleans up all buffers owned by the scan range (i.e. queued or unused buffers) |
| /// and wakes up any threads blocked on GetNext(). Status is a non-ok status with the |
| /// reason the range was cancelled, e.g. CANCELLED_INTERNALLY if the range was cancelled |
| /// because it was not needed, or another error if an error was encountered while |
| /// scanning the range. Status is returned to the any callers of GetNext(). |
| void Cancel(const Status& status); |
| |
| /// return a descriptive string for debug. |
| std::string DebugString() const; |
| |
| /// Non-HDFS files (e.g. local files) do not use mtime, so they should use this known |
| /// bogus mtime. |
| const static int64_t INVALID_MTIME = -1; |
| |
| int64_t mtime() const { return mtime_; } |
| |
| bool HasSubRanges() const { return !sub_ranges_.empty(); } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(ScanRange); |
| |
| ///////////////////////////////////////// |
| /// BEGIN: private members that are accessed by other io:: classes |
| friend class BufferDescriptor; |
| friend class DiskQueue; |
| friend class DiskIoMgr; |
| friend class DiskIoMgrTest; |
| friend class RequestContext; |
| friend class HdfsFileReader; |
| friend class LocalFileReader; |
| |
| // Tag for the buffer associated with range. See external_buffer_tag_ for details. |
| enum class ExternalBufferTag { CLIENT_BUFFER, CACHED_BUFFER, NO_BUFFER }; |
| |
| /// Initialize internal fields |
| void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader); |
| |
| /// If data is cached, returns ok() and * read_succeeded is set to true. Also enqueues |
| /// a ready buffer from the cached data. |
| /// If the data is not cached, returns ok() and *read_succeeded is set to false. |
| /// Returns a non-ok status if it ran into a non-continuable error. |
| /// The reader lock must be held by the caller. |
| Status ReadFromCache(const std::unique_lock<std::mutex>& reader_lock, |
| bool* read_succeeded) WARN_UNUSED_RESULT; |
| |
| /// Add buffers for the range to read data into and schedule the range if blocked. |
| /// If 'returned' is true, the buffers returned from GetNext() that are being recycled |
| /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to be added. |
| void AddUnusedBuffers( |
| std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned); |
| |
| /// Called from a disk I/O thread to read the next buffer of data for this range. The |
| /// returned ReadOutcome describes what the result of the read was. 'disk_id' is the |
| /// ID of the disk queue. 'queue' is updated with the sizes and latencies of reads from |
| /// the underlying filesystem. Caller must not hold 'lock_'. |
| ReadOutcome DoRead(DiskQueue* queue, int disk_id); |
| |
| /// The function runs the actual read logic to read content with the specific reader. |
| /// If use_local_buffer is true, it will read from the local buffer with the local |
| /// buffer reader. |
| ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buffer); |
| |
| /// Cleans up a buffer that was not returned to the client. |
| /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor. |
| /// The caller must hold 'lock_' via 'scan_range_lock'. |
| /// This function may acquire 'file_reader_->lock()' |
| void CleanUpBuffer(const std::unique_lock<std::mutex>& scan_range_lock, |
| std::unique_ptr<BufferDescriptor> buffer); |
| |
| /// Same as CleanUpBuffer() except cleans up multiple buffers and caller must not |
| /// hold 'lock_'. |
| void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& buffers); |
| |
| /// Same as Cancel() except it doesn't remove the scan range from |
| /// reader_->active_scan_ranges_ or call WaitForInFlightRead(). This allows for |
| /// custom handling of in flight reads or active scan ranges. For example, this is |
| /// invoked by RequestContext::Cancel(), which removes the range itself to avoid |
| /// invalidating its active_scan_ranges_ iterator. It is also invoked by disk IO |
| /// threads to propagate a read error for a range that is in flight (i.e. when |
| /// read_error is true), so 'read_in_flight_' is set to false and threads in |
| /// WaitForInFlightRead() are woken up. Note that this is tearing down the FileReader, |
| /// so it may block waiting for other threads that are performing IO. |
| void CancelInternal(const Status& status, bool read_error); |
| |
| /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'. |
| void SetBlockedOnBuffer(); |
| |
| ExternalBufferTag external_buffer_tag() const { return external_buffer_tag_; } |
| |
| /// If non-OK, this scan range has been cancelled. This status is the reason for |
| /// cancellation - CANCELLED_INTERNALLY if cancelled without error, or another status |
| /// if an error caused cancellation. Note that a range can be cancelled without |
| /// cancelling the owning context. This means that ranges can be cancelled or hit errors |
| /// without aborting all scan ranges. |
| // |
| /// Writers must hold both 'lock_' and 'file_reader_->lock()'. Readers must hold either |
| /// 'lock_' or 'file_reader_->lock()'. This prevents the range from being cancelled |
| /// while any thread is inside a critical section. |
| Status cancel_status_; |
| |
| /// Only for testing |
| void SetFileReader(std::unique_ptr<FileReader> file_reader); |
| |
| /// END: private members that are accessed by other io:: classes |
| ///////////////////////////////////////// |
| |
| /// Enqueues a ready buffer with valid data for this range. This does not block. |
| /// The caller passes ownership of buffer to the scan range and it is not |
| /// valid to access buffer after this call. Returns false if the scan range was |
| /// cancelled. |
| bool EnqueueReadyBuffer(std::unique_ptr<BufferDescriptor> buffer); |
| |
| /// Get the read statistics from the Hdfs file handle and aggregate them to |
| /// the RequestContext. This clears the statistics on this file handle. |
| /// It is safe to pass hdfsFile by value, as hdfsFile's underlying type is a |
| /// pointer. |
| void GetHdfsStatistics(hdfsFile fh); |
| |
| /// Remove a buffer from 'unused_iomgr_buffers_' and update |
| /// 'unused_iomgr_buffer_bytes_'. If 'unused_iomgr_buffers_' is empty, return NULL. |
| /// 'lock_' must be held by the caller via 'scan_range_lock'. |
| std::unique_ptr<BufferDescriptor> GetUnusedBuffer( |
| const std::unique_lock<std::mutex>& scan_range_lock); |
| |
| /// Clean up all buffers in 'unused_iomgr_buffers_'. Only valid to call when the scan |
| /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'. |
| void CleanUpUnusedBuffers(const std::unique_lock<std::mutex>& scan_range_lock); |
| |
| /// Waits for any in-flight read to complete. Called after CancelInternal() to ensure |
| /// no more reads will occur for the scan range. |
| void WaitForInFlightRead(); |
| |
| /// Returns true if no more buffers will be returned to clients in the future, |
| /// either because of hitting eosr or cancellation. |
| bool all_buffers_returned(const std::unique_lock<std::mutex>& lock) const { |
| DCHECK(lock.mutex() == &lock_ && lock.owns_lock()); |
| return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty()); |
| } |
| |
| /// Adds sub-ranges to this ScanRange. If sub_ranges is not empty, then ScanRange won't |
| /// read everything from its range, but will only read these sub-ranges. |
| /// Sub-ranges need to be ordered by 'offset' and cannot overlap with each other. |
| /// Doesn't need to merge continuous sub-ranges in advance, this method will do. |
| void InitSubRanges(std::vector<SubRange>&& sub_ranges); |
| |
| /// Read the sub-ranges into buffer and track the current position in 'sub_range_pos_'. |
| /// If cached data is available, then memcpy() from it instead of actually reading the |
| /// files. 'queue' is updated with the latencies and sizes of reads from the underlying |
| /// filesystem. |
| Status ReadSubRanges( |
| DiskQueue* queue, BufferDescriptor* buffer, bool* eof, FileReader* file_reader); |
| |
| /// Validates the internal state of this range. lock_ must be taken |
| /// before calling this. |
| bool Validate(); |
| |
| /// Validates the sub-ranges. All sub-range must be inside of this ScanRange. |
| /// They need to be ordered by offset and cannot overlap. |
| bool ValidateSubRanges(); |
| |
| /// Merges adjacent and continuous sub-ranges. |
| void MergeSubRanges(); |
| |
| /// Pointer to caller specified metadata. This is untouched by the io manager |
| /// and the caller can put whatever auxiliary data in here. |
| void* meta_data_ = nullptr; |
| |
| /// Options for enabling HDFS caching and data cache. |
| int cache_options_; |
| |
| /// If true, we expect this scan range to be a local read. Note that if this is false, |
| /// it does not necessarily mean we expect the read to be remote, and that we never |
| /// create scan ranges where some of the range is expected to be remote and some of it |
| /// local. |
| /// TODO: we can do more with this |
| bool expected_local_ = false; |
| |
| /// Last modified time of the file associated with the scan range. Set in Reset(). |
| int64_t mtime_; |
| |
| DiskIoMgr* io_mgr_ = nullptr; |
| |
| /// Reader/owner of the scan range |
| RequestContext* reader_ = nullptr; |
| |
| /// Tagged union that holds a buffer for the cases when there is a buffer allocated |
| /// externally from DiskIoMgr that is associated with the scan range. |
| ExternalBufferTag external_buffer_tag_; |
| |
| /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER. |
| struct { |
| /// Client-provided buffer to read the whole scan range into. |
| uint8_t* data = nullptr; |
| |
| /// Length of the client-provided buffer. |
| int64_t len = 0; |
| } client_buffer_; |
| |
| /// Valid if reading file contents from cache was successful. |
| struct { |
| /// Pointer to the contents of the file. |
| uint8_t* data = nullptr; |
| /// Length of the contents. |
| int64_t len = 0; |
| } cache_; |
| |
| /// The number of buffers that have been returned to a client via GetNext() that have |
| /// not yet been returned with ReturnBuffer(). |
| AtomicInt32 num_buffers_in_reader_{0}; |
| |
| /// Lock protecting fields below. |
| /// This lock should not be taken during FileReader::Open()/Read()/Close(). |
| /// If RequestContext::lock_ and this lock need to be held simultaneously, |
| /// RequestContext::lock_ must be taken first. |
| std::mutex lock_; |
| |
| /// Buffers to read into, used if the 'external_buffer_tag_' is NO_BUFFER. These are |
| /// initially populated when the client calls AllocateBuffersForRange() and |
| /// and are used to read scanned data into. Buffers are taken from this vector for |
| /// every read and added back, if needed, when the client calls ReturnBuffer(). |
| std::vector<std::unique_ptr<BufferDescriptor>> unused_iomgr_buffers_; |
| |
| /// Total number of bytes of buffers in 'unused_iomgr_buffers_'. |
| int64_t unused_iomgr_buffer_bytes_ = 0; |
| |
| /// Cumulative bytes of I/O mgr buffers taken from 'unused_iomgr_buffers_' by DoRead(). |
| /// Used to infer how many bytes of buffers need to be held onto to read the rest of |
| /// the scan range. |
| int64_t iomgr_buffer_cumulative_bytes_used_ = 0; |
| |
| /// True if a disk thread is currently doing a read for this scan range. Set to true in |
| /// DoRead() and set to false in EnqueueReadyBuffer() or CancelInternal() when the |
| /// read completes and any buffer used for the read is either enqueued or freed. |
| bool read_in_flight_ = false; |
| |
| /// If true, the last buffer for this scan range has been queued. |
| /// If this is true and 'ready_buffers_' is empty, then no more buffers will be |
| /// returned to the caller by this scan range. |
| bool eosr_queued_ = false; |
| |
| /// If true, this scan range is not scheduled because a buffer is not available for |
| /// the next I/O in the range. This can happen when the scan range is initially created |
| /// or if the buffers added to the range have all been filled with data an not yet |
| /// returned. |
| bool blocked_on_buffer_ = false; |
| |
| /// IO buffers that are queued for this scan range. When Cancel() is called |
| /// this is drained by the cancelling thread. I.e. this is always empty if |
| /// 'cancel_status_' is not OK. |
| std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_; |
| |
| /// Condition variable for threads in GetNext() that are waiting for the next buffer |
| /// and threads in WaitForInFlightRead() that are waiting for a read to finish. |
| /// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan range is |
| /// cancelled. |
| ConditionVariable buffer_ready_cv_; |
| |
| /// Number of bytes read by this scan range. |
| int64_t bytes_read_ = 0; |
| |
| /// Polymorphic object that is responsible for doing file operations. |
| std::unique_ptr<FileReader> file_reader_; |
| |
| /// Polymorphic object that is responsible for doing file operations if the path |
| /// is a remote url and the file is in the local buffer. |
| std::unique_ptr<FileReader> local_buffer_reader_; |
| |
| /// If set to true, the scan range is using local_buffer_reader_ to do scan operations. |
| /// The flag is set during DoRead(). If the path is a remote path and the file has |
| /// a local buffer, the flag is set to true, otherwise the flag is false. |
| bool use_local_buffer_ = false; |
| |
| /// If not empty, the ScanRange will only read these parts from the file. |
| std::vector<SubRange> sub_ranges_; |
| |
| /// The file handle of the physical file to be accessed. |
| DiskFile* disk_file_ = nullptr; |
| |
| /// The file handle of the physical buffer file to be accessed. |
| DiskFile* disk_buffer_file_ = nullptr; |
| |
| // Read position in the sub-ranges. |
| struct SubRangePosition { |
| /// Index of SubRange in 'ScanRange::sub_ranges_' to read next |
| int64_t index = 0; |
| /// Bytes already read from 'ScanRange::sub_ranges_[sub_range_index]' |
| int64_t bytes_read = 0; |
| }; |
| |
| /// Current read position in the sub-ranges. |
| SubRangePosition sub_range_pos_; |
| |
| /// Number of bytes need to be read by this ScanRange. If there are no sub-ranges it |
| /// equals to 'len_'. If there are sub-ranges then it equals to the sum of the lengths |
| /// of the sub-ranges (which is less than or equal to 'len_'). |
| int64_t bytes_to_read_ = 0; |
| }; |
| |
| /// Used to specify data to be written to a file and offset. |
| /// It is the responsibility of the client to ensure that the data to be written is |
| /// valid and that the file to be written to exists until the callback is invoked. |
| /// A callback is invoked to inform the client when the write is done. |
| class WriteRange : public RequestRange { |
| public: |
| /// This callback is invoked on each WriteRange after the write is complete or the |
| /// context is cancelled. The status returned by the callback parameter indicates |
| /// if the write was successful (i.e. Status::OK), if there was an error |
| /// TErrorCode::RUNTIME_ERROR) or if the context was cancelled |
| /// (TErrorCode::CANCELLED_INTERNALLY). The callback is only invoked if this |
| /// WriteRange was successfully added (i.e. AddWriteRange() succeeded). No locks are |
| /// held while the callback is invoked. |
| typedef std::function<void(const Status&)> WriteDoneCallback; |
| WriteRange(const std::string& file, int64_t file_offset, int disk_id, |
| WriteDoneCallback callback); |
| |
| /// Change the file and offset of this write range. Data and callbacks are unchanged. |
| /// Can only be called when the write is not in flight (i.e. before AddWriteRange() |
| /// is called or after the write callback was called). |
| void SetRange(const std::string& file, int64_t file_offset, int disk_id); |
| |
| /// Set the data and number of bytes to be written for this WriteRange. |
| /// Can only be called when the write is not in flight (i.e. before AddWriteRange() |
| /// is called or after the write callback was called). |
| void SetData(const uint8_t* buffer, int64_t len); |
| |
| /// Set the offset of this WriteRange. |
| /// Caller should guarantee the thread safe of calling the function. |
| void SetOffset(int64_t file_offset); |
| |
| /// Set the DiskFile pointer which the WriteRange belongs to. |
| /// Can only be called when the write is not in flight (i.e. before AddWriteRange() |
| /// is called or after the write callback was called). |
| void SetDiskFile(DiskFile* disk_file) { disk_file_ = disk_file; } |
| |
| /// Set the IO context to this WriteRange. |
| /// Can only be called when the write is not in flight (i.e. before AddWriteRange() |
| /// is called or after the write callback was called). |
| void SetRequestContext(RequestContext* io_ctx) { io_ctx_ = io_ctx; } |
| |
| /// Execute writing the this range to the corresponding file. |
| Status DoWrite(); |
| |
| /// Handle the status of Function DoWrite(). |
| Status DoWriteEnd(DiskQueue* queue, const Status& ret_status); |
| |
| /// Return the data to be written. |
| const uint8_t* data() const { return data_; } |
| |
| /// Return the disk file pointer of the write range. |
| DiskFile* disk_file() const { return disk_file_; } |
| |
| RequestContext* io_ctx() const { return io_ctx_; } |
| |
| /// Return if the disk file that the write range belongs to is completed. |
| bool is_full() const { return is_full_; } |
| |
| WriteDoneCallback callback() const { return callback_; } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(WriteRange); |
| |
| friend class LocalFileWriter; |
| |
| /// Data to be written. RequestRange::len_ contains the length of data |
| /// to be written. |
| const uint8_t* data_; |
| |
| /// Callback to invoke after the write is complete. |
| WriteDoneCallback callback_; |
| |
| /// IO context is to help to find out the disk queue and corresponding metrics. |
| RequestContext* io_ctx_; |
| |
| /// The DiskFile is which the WriteRange writes into. |
| /// A DiskFile can have multiple WriteRanges. |
| DiskFile* disk_file_ = nullptr; |
| |
| /// Indicate if the file which the write range belongs to is full after writing. |
| bool is_full_ = false; |
| }; |
| |
| class RemoteOperRange : public RequestRange { |
| public: |
| /// This callback is invoked on each RemoteOperRange after the operation is complete |
| /// or the context is cancelled. The status returned by the callback parameter indicates |
| /// if the operation was successful (i.e. Status::OK), if there was an error |
| /// TErrorCode::RUNTIME_ERROR) or if the context was cancelled |
| /// (TErrorCode::CANCELLED_INTERNALLY). The callback is only invoked if this |
| /// RemoteOperRange was successfully added (i.e. AddRemoteOperRange() succeeded). |
| /// No locks are held while the callback is invoked. |
| typedef std::function<void(const Status&)> RemoteOperDoneCallback; |
| RemoteOperRange(DiskFile* src_file, DiskFile* dst_file, int64_t file_offset, |
| int disk_id, RequestType::type type, DiskIoMgr* io_mgr, |
| RemoteOperDoneCallback callback); |
| |
| /// Called from a disk I/O thread to do the file operation of this range. The |
| /// returned Status describes what the result of the read was. 'buff' is the |
| /// block buffer which is used for file operations. 'buff_size' is the size of the |
| /// block buffer. Caller must not hold 'lock_'. |
| Status DoOper(uint8_t* buff, int64_t buff_size); |
| |
| int64_t block_size() { return block_size_; } |
| |
| RemoteOperDoneCallback callback() const { return callback_; } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(RemoteOperRange); |
| |
| friend class LocalFileWriter; |
| |
| /// Callback to invoke after the operation is complete. |
| RemoteOperDoneCallback callback_; |
| |
| /// IO manager is to help to find out the disk queue and corresponding metrics. |
| DiskIoMgr* io_mgr_; |
| |
| /// disk_file_src_ contains the information of the file which is the source to |
| /// do the file operation. For example, if the operation is file upload, this |
| /// is the handle for the source physical file to be uploaded. |
| DiskFile* disk_file_src_; |
| |
| /// disk_file_dst_ contains the information of the file which is the destination |
| /// of the file operation. |
| DiskFile* disk_file_dst_; |
| |
| /// block size to do the file operation. |
| int64_t block_size_; |
| |
| /// Execute the upload file operation. |
| Status DoUpload(uint8_t* buff, int64_t buff_size); |
| }; |
| |
| inline bool BufferDescriptor::is_cached() const { |
| return scan_range_->external_buffer_tag_ |
| == ScanRange::ExternalBufferTag::CACHED_BUFFER; |
| } |
| |
| inline bool BufferDescriptor::is_client_buffer() const { |
| return scan_range_->external_buffer_tag_ |
| == ScanRange::ExternalBufferTag::CLIENT_BUFFER; |
| } |
| } |
| } |