blob: 0bb8d740a63edd41e5c9a949077dbdd7403add1d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <deque>
#include <boost/cstdint.hpp>
#include <boost/scoped_ptr.hpp>
#include "common/compiler-util.h"
#include "common/status.h"
#include "exec/filter-context.h"
#include "runtime/bufferpool/buffer-pool.h"
#include "runtime/io/request-ranges.h"
namespace impala {
struct HdfsFileDesc;
class HdfsPartitionDescriptor;
class HdfsScanNodeBase;
class MemPool;
class RowBatch;
class RuntimeState;
class StringBuffer;
class Tuple;
class TupleRow;
/// This class abstracts over getting buffers from the IoMgr. Each ScannerContext is 1:1
/// a HdfsScanner. ScannerContexts contain Streams, which are 1:1 with a ScanRange.
/// Columnar formats have multiple streams per context object.
/// This class handles stitching data split across IO buffers and providing
/// some basic parsing utilities.
/// This class it *not* thread safe. It is designed to have a single scanner thread
/// reading from it.
/// Each scanner context maps to a single hdfs split. There are three threads that
/// are interacting with the context.
/// 1. IoMgr threads that read io buffers from the disk and enqueue them to the
/// stream's underlying ScanRange object. This is the producer.
/// 2. Scanner thread that calls GetBytes() (which can block), materializing tuples
/// from processing the bytes. This is the consumer.
/// 3. The scan node/main thread which calls into the context to trigger cancellation
/// or other end of stream conditions.
/// Memory management
/// =================
/// Pointers into memory returned from stream methods remain valid until either
/// ReleaseCompletedResources() is called or an operation advances the stream's read
/// offset past the end of the memory .
/// E.g. if ReadBytes(peek=false) is called, the memory returned is invalidated when
/// ReadBytes(), SkipBytes(), ReadVint(), etc is called. If the memory is obtained by
/// a "peeking" operation, then the memory returned remains valid until the read offset
/// in the stream is advanced past the end of the memory. E.g. if
/// ReadBuffer(n, peek=true) is called, then the memory remains valid if SkipBytes(n)
/// is called the first time, but not if SkipBytes() is called again to advance further.
/// Each stream only requires a single I/O buffer to make progress on reading through the
/// stream. Additional I/O buffers allow the I/O manager to read ahead in the scan range.
/// The scanner context also allocates memory from a MemPool for reads that straddle I/O
/// buffers (e.g. a small read at the boundary of I/O buffers or a read larger than
/// a single I/O buffer). The amount of memory allocated from the MemPool is determined
/// by the maximum buffer size read from the stream, plus some overhead. E.g.
/// ReadBytes(length=50KB) requires allocating a 50KB buffer from the MemPool if the
/// read straddles a buffer boundary.
/// TODO: Some of the synchronization mechanisms such as cancelled() can be removed
/// once the legacy hdfs scan node has been removed.
class ScannerContext {
/// Create a scanner context with the parent scan_node (where materialized row batches
/// get pushed to) and the scan range to process. Buffers are allocated using
/// 'bp_client'. 'total_reservation' bytes of 'bp_client''s reservation has been
/// initally allotted for use by this scanner.
ScannerContext(RuntimeState* state, HdfsScanNodeBase* scan_node,
BufferPool::ClientHandle* bp_client, int64_t total_reservation,
HdfsPartitionDescriptor* partition_desc,
const std::vector<FilterContext>& filter_ctxs,
MemPool* expr_results_pool);
/// Destructor verifies that all stream objects have been released.
/// Encapsulates a stream (continuous byte range) that can be read. A context
/// can contain one or more streams. For non-columnar files, there is only
/// one stream; for columnar, there is one stream per column.
class Stream {
/// Returns up to requested_len bytes or an error. This can block if bytes are not
/// available.
/// - requested_len is the number of bytes requested. This function will return
/// those number of bytes unless end of file or an error occurred.
/// - If peek is true, the scan range position is not incremented (i.e. repeated calls
/// with peek = true will return the same data).
/// - *buffer on return is a pointer to the buffer. The memory is owned by
/// the ScannerContext and should not be modified. If the buffer is entirely
/// from one disk io buffer, a pointer inside that buffer is returned directly.
/// If the requested buffer straddles io buffers, a copy is done here.
/// - *out_len is the number of bytes returned.
/// - *status is set if there is an error.
/// Returns true if the call was successful (i.e. status->ok())
/// This should only be called from the scanner thread.
/// Note that this will return bytes past the end of the scan range until the end of
/// the file.
bool GetBytes(int64_t requested_len, uint8_t** buffer, int64_t* out_len,
Status* status, bool peek = false) WARN_UNUSED_RESULT;
/// Gets the bytes from the first available buffer within the scan range. This may be
/// the boundary buffer used to stitch IO buffers together.
/// If we are past the end of the scan range, no bytes are returned.
Status GetBuffer(bool peek, uint8_t** buffer, int64_t* out_len);
/// Callback that returns the buffer size to use when reading past the end of the scan
/// range. Reading past the end of the scan range is likely a remote read, so we want
/// find a good trade-off between io requests and data volume. Scanners that have
/// some information about the optimal read size can provide this callback to
/// override the default read-size doubling strategy (see GetNextBuffer()). If the
/// callback returns a positive length, this overrides the default strategy. If the
/// callback returns a length greater than the max read size, the max read size will
/// be used.
/// The callback takes the file offset of the asynchronous read (this may be more
/// than file_offset() due to data being assembled in the boundary buffer).
typedef boost::function<int (int64_t)> ReadPastSizeCallback;
void set_read_past_size_cb(ReadPastSizeCallback cb) { read_past_size_cb_ = cb; }
/// Return the number of bytes left in the range for this stream.
int64_t bytes_left() { return scan_range_->len() - total_bytes_returned_; }
/// If true, all bytes in this scan range have been returned from this ScannerContext
/// to callers or we hit eof before reaching the end of the scan range. Callers can
/// continue to call Read*()/Get*()/Skip*() methods on the stream until eof() is true.
bool eosr() const { return total_bytes_returned_ >= scan_range_->len() || eof(); }
/// If true, the stream has reached the end of the file. After this is true, any
/// Read*()/Get*()/Skip*() methods will not succeed.
bool eof() const { return file_offset() == file_len_; }
const char* filename() { return scan_range_->file(); }
const io::ScanRange* scan_range() { return scan_range_; }
const HdfsFileDesc* file_desc() { return file_desc_; }
int64_t reservation() const { return reservation_; }
/// Returns the buffer's current offset in the file.
int64_t file_offset() const { return scan_range_->offset() + total_bytes_returned_; }
/// Returns the total number of bytes returned
int64_t total_bytes_returned() { return total_bytes_returned_; }
/// Read a Boolean primitive value written using Java serialization. Returns true
/// on success, otherwise returns false and sets 'status' to indicate the error.
/// Equivalent to
bool ReadBoolean(bool* boolean, Status* status) WARN_UNUSED_RESULT;
/// Read an Integer primitive value written using Java serialization. Returns true
/// on success, otherwise returns false and sets 'status' to indicate the error.
/// Equivalent to
bool ReadInt(int32_t* val, Status* status, bool peek = false) WARN_UNUSED_RESULT;
/// Read a variable-length Long value written using Writable serialization. Returns
/// true on success, otherwise returns false and sets 'status' to indicate the error.
/// Ref:
bool ReadVLong(int64_t* val, Status* status) WARN_UNUSED_RESULT;
/// Read a variable length Integer value written using Writable serialization. Returns
/// true on success, otherwise returns false and sets 'status' to indicate the error.
/// Ref:
bool ReadVInt(int32_t* val, Status* status) WARN_UNUSED_RESULT;
/// Read a zigzag encoded long. Returns true on success, otherwise returns false and
/// sets 'status' to indicate the error.
bool ReadZLong(int64_t* val, Status* status) WARN_UNUSED_RESULT;
/// Skip over the next length bytes in the specified HDFS file. Returns true on
/// success, otherwise returns false and sets 'status' to indicate the error.
bool SkipBytes(int64_t length, Status* status) WARN_UNUSED_RESULT;
/// Read length bytes into the supplied buffer. The returned buffer is owned
/// by this object. Returns true on success, otherwise returns false and sets 'status'
/// to indicate the error.
bool ReadBytes(int64_t length, uint8_t** buf, Status* status, bool peek = false)
/// Read a Writable Text value from the supplied file. Returns true on success,
/// otherwise returns false and sets 'status' to indicate the error.
/// Ref:
/// The returned buffer is owned by this object.
bool ReadText(uint8_t** buf, int64_t* length, Status* status) WARN_UNUSED_RESULT;
/// Skip this text object. Returns true on success, otherwise returns false and
/// sets 'status'
bool SkipText(Status* status) WARN_UNUSED_RESULT;
/// Release completed resources, e.g. the last buffer if the current read position is
/// at the end of the buffer. If 'done' is true all resources are freed, even if the
/// caller has not read that data yet. After calling this function, any memory
/// returned from previous Read*()/Get*() functions is invalid to reference.
/// Also see the ScannerContext::ReleaseCompletedResources() comment.
void ReleaseCompletedResources(bool done);
friend class ScannerContext;
ScannerContext* const parent_;
io::ScanRange* const scan_range_;
const HdfsFileDesc* const file_desc_;
/// Reservation given to this stream for allocating I/O buffers. The reservation is
/// shared with 'scan_range_', so the context must be careful not to use this until
/// all of 'scan_ranges_'s buffers have been freed. Must be >= the minimum IoMgr
/// buffer size to allow reading past the end of 'scan_range_'.
const int64_t reservation_;
/// Total number of bytes returned from GetBytes()
int64_t total_bytes_returned_ = 0;
/// File length. Initialized with file_desc_->file_length but updated if eof is found
/// earlier, i.e. the file was truncated.
int64_t file_len_;
/// Callback if a scanner wants to implement custom logic for guessing how far to
/// read past the end of the scan range.
ReadPastSizeCallback read_past_size_cb_;
/// The next amount we should read past the end of the file, if using the default
/// doubling algorithm. Unused if 'read_past_size_cb_' is set.
int64_t next_read_past_size_bytes_;
/// The current I/O buffer. NULL before we've read any bytes or if the last read
/// I/O buffer was released.
std::unique_ptr<io::BufferDescriptor> io_buffer_;
/// True if 'scan_range_' returned eosr, which means that we read to the end of that
/// scan range. This is different from eosr() because it tracks whether the
/// scan range reached eosr, not whether eosr() was returned to the caller.
bool scan_range_eosr_ = false;
/// Next byte to read in io_buffer_
uint8_t* io_buffer_pos_ = nullptr;
/// Bytes left in io_buffer_
int64_t io_buffer_bytes_left_ = 0;
/// The boundary buffer is used to copy multiple IO buffers from the scan range into a
/// single buffer to return to the scanner. After copying all or part of an IO buffer
/// into the boundary buffer, the current buffer's state is updated to no longer
/// include the copied bytes (e.g., io_buffer_bytes_left_ is decremented).
/// Conceptually, the data in the boundary buffer always comes before that in the
/// current buffer, and all the bytes in the stream are either already returned to the
/// scanner, in the current IO buffer, or in the boundary buffer.
boost::scoped_ptr<MemPool> boundary_pool_;
boost::scoped_ptr<StringBuffer> boundary_buffer_;
uint8_t* boundary_buffer_pos_ = nullptr;
int64_t boundary_buffer_bytes_left_ = 0;
/// Points to either io_buffer_pos_ or boundary_buffer_pos_
/// (initialized to NULL before calling GetBytes())
uint8_t** output_buffer_pos_ = nullptr;
/// Points to either io_buffer_bytes_left_ or boundary_buffer_bytes_left_
/// (initialized to a static zero-value int before calling GetBytes())
int64_t* output_buffer_bytes_left_ =
/// We always want output_buffer_bytes_left_ to be non-NULL, so we can avoid a NULL
/// check in GetBytes(). We use this variable, which is set to 0, to initialize
/// output_buffer_bytes_left_. After the first successful call to GetBytes(),
/// output_buffer_bytes_left_ will be set to something else.
static const int64_t OUTPUT_BUFFER_BYTES_LEFT_INIT = 0;
/// Private constructor. See AddStream() for public API.
Stream(ScannerContext* parent, io::ScanRange* scan_range, int64_t reservation,
const HdfsFileDesc* file_desc);
/// GetBytes helper to handle the slow path.
/// If 'peek' is true then return the data but do not move the current offset.
/// If 'peek' is not true, the returned buffer memory remains valid until next
/// operation that reads from the stream.
Status GetBytesInternal(int64_t requested_len, uint8_t** buffer, bool peek,
int64_t* out_len);
/// SkipBytes() helper to handle the slow path where we need to skip past the
/// current I/O buffer. Called when the current I/O and boundary buffers are
/// exhausted. Skips 'bytes_left' bytes in subsequent I/O buffers. 'length' is the
/// argument to the SkipBytes() call, used for error reporting. Sets 'io_buffer_',
/// 'io_buffer_pos_', 'io_buffer_bytes_left_' and 'total_bytes_returned_'.
bool SkipBytesInternal(int64_t length, int64_t bytes_left, Status* status);
/// Copy 'num_bytes' bytes from the I/O buffer at 'io_buffer_pos_' to the
/// boundary buffer and set 'output_buffer_pos_' and 'output_buffer_bytes_left_'
/// to point at the boundary buffer variables. Advances 'io_buffer_pos_' and
/// 'io_buffer_bytes_left_' by 'num_bytes'. Returns an error if the boundary
/// buffer cannot be extended to fit the new data.
/// Returns 'io_buffer_' to the I/O manager if all its data was copied to the
/// boundary buffer.
Status CopyIoToBoundary(int64_t num_bytes);
/// Returns 'io_buffer_' to the I/O manager, setting it to NULL in the process,
/// and resets 'io_buffer_bytes_left_' and 'io_buffer_pos_'.
void ReturnIoBuffer();
/// Gets (and blocks) for the next io buffer. After fetching all buffers in the scan
/// range, performs synchronous reads past the scan range until EOF.
/// When performing a synchronous read, the read size is the max of 'read_past_size'
/// and either the result of read_past_size_cb_(), or the result of iteratively
/// doubling INIT_READ_PAST_SIZE up to the max read size. 'read_past_size' is not
/// used otherwise. This is done to find a balance between reading too much data
/// and issuing too many small reads.
/// Updates 'io_buffer_', 'io_buffer_bytes_left_', and 'io_buffer_pos_'. If
/// GetNextBuffer() is called after all bytes in the file have been returned,
/// 'io_buffer_bytes_left_' will be set to 0. In the non-error case, 'io_buffer_' is
/// never set to NULL, even if it contains 0 bytes.
Status GetNextBuffer(int64_t read_past_size = 0);
/// Helper to advance position and bytes left for a buffer by 'bytes'.
void AdvanceBufferPos(
int64_t bytes, uint8_t** buffer_pos, int64_t* buffer_bytes_left);
/// Validates that the output buffer pointers point to the correct buffer.
bool ValidateBufferPointers() const;
/// Error-reporting functions.
Status ReportIncompleteRead(int64_t length, int64_t bytes_read);
Status ReportInvalidRead(int64_t length);
Status ReportInvalidInt();
Stream* GetStream(int idx = 0) {
DCHECK_GE(idx, 0);
DCHECK_LT(idx, streams_.size());
return streams_[idx].get();
int NumStreams() const { return streams_.size(); }
/// Tries to increase 'total_reservation()' to 'ideal_reservation'. May get
/// none, part or all of the requested increase. total_reservation() can be
/// checked by the caller to find out the new total reservation. When this
/// ScannerContext is destroyed, the scan node takes back ownership of
/// total_reservation().
void TryIncreaseReservation(int64_t ideal_reservation);
/// Release completed resources for all streams, e.g. the last buffer in each stream if
/// the current read position is at the end of the buffer. If 'done' is true all
/// resources are freed, even if the caller has not read that data yet. After calling
/// this function, any memory returned from previous Read*()/Get*() functions is
/// invalid to reference. Callers which want to clear the streams from the context
/// should also call ClearStreams().
/// This must be called with 'done' set when the scanner is complete and no longer needs
/// any resources. After calling with 'done' set, this should be called again if new
/// streams are created via AddStream().
void ReleaseCompletedResources(bool done);
/// Releases all the Stream objects in the vector 'streams_' and reduces the vector's
/// size to 0.
void ClearStreams();
/// Add a stream to this ScannerContext for 'range'. 'range' must already have any
/// buffers that it needs allocated. 'reservation' is the amount of reservation that
/// is given to this stream for allocating I/O buffers. The reservation is shared with
/// 'range', so the context must be careful not to use this until all of 'range's
/// buffers have been freed. Must be >= the minimum IoMgr buffer size to allow reading
/// past the end of 'range'. 'reservation' must be <=
/// ScannerContext::total_reservation(), i.e. this reservation is included in the total.
/// Returns the added stream. The returned stream is owned by this context.
Stream* AddStream(io::ScanRange* range, int64_t reservation);
/// Returns true if RuntimeState::is_cancelled() is true, or if scan node is not
/// multi-threaded and is done (finished, cancelled or reached it's limit).
/// In all other cases returns false.
bool cancelled() const;
BufferPool::ClientHandle* bp_client() const { return bp_client_; }
int64_t total_reservation() const { return total_reservation_; }
HdfsPartitionDescriptor* partition_descriptor() const { return partition_desc_; }
const std::vector<FilterContext>& filter_ctxs() const { return filter_ctxs_; }
MemPool* expr_results_pool() const { return expr_results_pool_; }
friend class Stream;
RuntimeState* const state_;
HdfsScanNodeBase* const scan_node_;
/// Buffer pool client used to allocate I/O buffers. This is accessed by multiple
/// threads in the multi-threaded scan node, so those threads must take care to only
/// call thread-safe BufferPool methods with this client.
BufferPool::ClientHandle* const bp_client_;
/// Total reservation from 'bp_client_' that this scanner is allowed to use.
/// TODO: when we remove the multi-threaded scan node, we may be able to just use
/// bp_client_->Reservation()
int64_t total_reservation_;
HdfsPartitionDescriptor* const partition_desc_;
/// Vector of streams. Non-columnar formats will always have one stream per context.
std::vector<std::unique_ptr<Stream>> streams_;
/// Filter contexts for all filters applicable to this scan. Memory attached to the
/// context is owned by the scan node.
std::vector<FilterContext> filter_ctxs_;
/// MemPool used for allocations that hold results of expression evaluation in the
/// scanner and 'filter_ctxs_'. Must be thread-local since MemPool is not thread-safe.
/// Owned by ScannerThread() in the multi-threaded scan node and by the ExecNode in the
/// single-threaded scan node implementation.
/// The scanner is responsible for clearing the MemPool periodically after expression
/// evaluation to prevent memory from accumulating.
/// TODO: IMPALA-6015: it should be possible to simplify the lifecycle of this pool and
/// filter_ctxs_ once the multithreaded scan node is removed.
MemPool* const expr_results_pool_;