| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #ifndef IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H |
| #define IMPALA_RUNTIME_BUFFERED_TUPLE_STREAM_H |
| |
| #include <vector> |
| #include <set> |
| |
| #include "common/status.h" |
| #include "runtime/buffered-block-mgr.h" |
| #include "runtime/row-batch.h" |
| |
| namespace impala { |
| |
| class BufferedBlockMgr; |
| class RuntimeProfile; |
| class RuntimeState; |
| class RowDescriptor; |
| class SlotDescriptor; |
| class TupleRow; |
| |
| /// Class that provides an abstraction for a stream of tuple rows. Rows can be |
| /// added to the stream and returned. Rows are returned in the order they are added. |
| /// |
| /// The underlying memory management is done by the BufferedBlockMgr. |
| /// |
| /// The tuple stream consists of a number of small (less than IO-sized blocks) before |
| /// an arbitrary number of IO-sized blocks. The smaller blocks do not spill and are |
| /// there to lower the minimum buffering requirements. For example, an operator that |
| /// needs to maintain 64 streams (1 buffer per partition) would need, by default, |
| /// 64 * 8MB = 512MB of buffering. A query with 5 of these operators would require |
| /// 2.56GB just to run, regardless of how much of that is used. This is |
| /// problematic for small queries. Instead we will start with a fixed number of small |
| /// buffers (currently 2 small buffers: one 64KB and one 512KB) and only start using IO |
| /// sized buffers when those fill up. The small buffers never spill. |
| /// The stream will *not* automatically switch from using small buffers to IO-sized |
| /// buffers when all the small buffers for this stream have been used. |
| /// |
| /// The BufferedTupleStream is *not* thread safe from the caller's point of view. It is |
| /// expected that all the APIs are called from a single thread. Internally, the |
| /// object is thread safe wrt to the underlying block mgr. |
| /// |
| /// Buffer management: |
| /// The stream is either pinned or unpinned, set via PinStream() and UnpinStream(). |
| /// Blocks are optionally deleted as they are read, set with the delete_on_read argument |
| /// to PrepareForRead(). |
| /// |
| /// Block layout: |
| /// If the stream's tuples are nullable (i.e. has_nullable_tuple_ is true), there is a |
| /// bitstring at the start of each block with null indicators for all tuples in each row |
| /// in the block. The length of the bitstring is a function of the block size. Row data |
| /// is stored after the null indicators if present, or at the start of the block |
| /// otherwise. Rows are stored back to back in the stream, with no interleaving of data |
| /// from different rows. There is no padding or alignment between rows. |
| /// |
| /// Null tuples: |
| /// The order of bits in the null indicators bitstring corresponds to the order of |
| /// tuples in the block. The NULL tuples are not stored in the row iself, only as set |
| /// bits in the null indicators bitstring. |
| /// |
| /// Tuple row layout: |
| /// The fixed length parts of the row's tuples are stored first, followed by var len data |
| /// for inlined_string_slots_ and inlined_coll_slots_. Other "external" var len slots can |
| /// point to var len data outside the stream. When reading the stream, the length of each |
| /// row's var len data in the stream must be computed to find the next row's start. |
| /// |
| /// The tuple stream supports reading from the stream into RowBatches without copying |
| /// out any data: the RowBatches' Tuple pointers will point directly into the stream's |
| /// blocks. The fixed length parts follow Impala's internal tuple format, so for the |
| /// tuple to be valid, we only need to update pointers to point to the var len data |
| /// in the stream. These pointers need to be updated by the stream because a spilled |
| /// block may be relocated to a different location in memory. The pointers are updated |
| /// lazily upon reading the stream via GetNext() or GetRows(). |
| /// |
| /// Example layout for a row with two tuples ((1, "hello"), (2, "world")) with all var |
| /// len data stored in the stream: |
| /// <---- tuple 1 -----> <------ tuple 2 ------> <- var len -> <- next row ... |
| /// +--------+-----------+-----------+-----------+-------------+ |
| /// | IntVal | StringVal | BigIntVal | StringVal | | ... |
| /// +--------+-----------+-----------+-----------++------------+ |
| /// | val: 1 | len: 5 | val: 2 | len: 5 | helloworld | ... |
| /// | | ptr: 0x.. | | ptr: 0x.. | | ... |
| /// +--------+-----------+-----------+-----------+-------------+ |
| /// <--4b--> <---12b---> <----8b---> <---12b---> <----10b----> |
| // |
| /// Example layout for a row with a single tuple (("hello", "world")) with the second |
| /// string slot stored externally to the stream: |
| /// <------ tuple 1 ------> <- var len -> <- next row ... |
| /// +-----------+-----------+-------------+ |
| /// | StringVal | StringVal | | ... |
| /// +-----------+-----------+-------------+ |
| /// | len: 5 | len: 5 | hello | ... |
| /// | ptr: 0x.. | ptr: 0x.. | | ... |
| /// +-----------+-----------+-------------+ |
| /// <---12b---> <---12b---> <-----5b----> |
| /// |
| /// The behavior of reads and writes is as follows: |
| /// Read: |
| /// 1. Delete on read (delete_on_read_): Blocks are deleted as we go through the stream. |
| /// The data returned by the tuple stream is valid until the next read call so the |
| /// caller does not need to copy if it is streaming. |
| /// 2. Unpinned: Blocks remain in blocks_ and are unpinned after reading. |
| /// 3. Pinned: Blocks remain in blocks_ and are left pinned after reading. If the next |
| /// block in the stream cannot be pinned, the read call will fail and the caller needs |
| /// to free memory from the underlying block mgr. |
| /// Write: |
| /// 1. Unpinned: Unpin blocks as they fill up. This means only a single (i.e. the |
| /// current) block needs to be in memory regardless of the input size (if read_write is |
| /// true, then two blocks need to be in memory). |
| /// 2. Pinned: Blocks are left pinned. If we run out of blocks, the write will fail and |
| /// the caller needs to free memory from the underlying block mgr. |
| /// |
| /// Memory lifetime of rows read from stream: |
| /// If the stream is pinned, it is valid to access any tuples returned via |
| /// GetNext() or GetRows() until the stream is unpinned. If the stream is unpinned, and |
| /// the batch returned from GetNext() has the needs_deep_copy flag set, any tuple memory |
| /// returned so far from the stream may be freed on the next call to GetNext(). |
| /// |
| /// Manual construction of rows with AllocateRow(): |
| /// The BufferedTupleStream supports allocation of uninitialized rows with AllocateRow(). |
| /// The caller of AllocateRow() is responsible for writing the row with exactly the |
| /// layout described above. |
| /// |
| /// If a caller constructs a tuple in this way, the caller can set the pointers and they |
| /// will not be modified until the stream is read via GetNext() or GetRows(). |
| /// |
| /// TODO: we need to be able to do read ahead in the BufferedBlockMgr. It currently |
| /// only has PinAllBlocks() which is blocking. We need a non-blocking version of this or |
| /// some way to indicate a block will need to be pinned soon. |
| /// TODO: see if this can be merged with Sorter::Run. The key difference is that this |
| /// does not need to return rows in the order they were added, which allows it to be |
| /// simpler. |
| /// TODO: we could compact the small buffers when we need to spill but they use very |
| /// little memory so ths might not be very useful. |
| /// TODO: improvements: |
| /// - It would be good to allocate the null indicators at the end of each block and grow |
| /// this array as new rows are inserted in the block. If we do so, then there will be |
| /// fewer gaps in case of many rows with NULL tuples. |
| /// - We will want to multithread this. Add a AddBlock() call so the synchronization |
| /// happens at the block level. This is a natural extension. |
| /// - Instead of allocating all blocks from the block_mgr, allocate some blocks that |
| /// are much smaller (e.g. 16K and doubling up to the block size). This way, very |
| /// small streams (a common case) will use very little memory. This small blocks |
| /// are always in memory since spilling them frees up negligible memory. |
| /// - Return row batches in GetNext() instead of filling one in |
| class BufferedTupleStream { |
| public: |
| /// Ordinal index into the stream to retrieve a row in O(1) time. This index can |
| /// only be used if the stream is pinned. |
| /// To read a row from a stream we need three pieces of information that we squeeze in |
| /// 64 bits: |
| /// - The index of the block. The block id is stored in 16 bits. We can have up to |
| /// 64K blocks per tuple stream. With 8MB blocks that is 512GB per stream. |
| /// - The offset of the start of the row (data) within the block. Since blocks are 8MB |
| /// we use 24 bits for the offsets. (In theory we could use 23 bits.) |
| /// - The idx of the row in the block. We need this for retrieving the null indicators. |
| /// We use 24 bits for this index as well. |
| struct RowIdx { |
| static const uint64_t BLOCK_MASK = 0xFFFF; |
| static const uint64_t BLOCK_SHIFT = 0; |
| static const uint64_t OFFSET_MASK = 0xFFFFFF0000; |
| static const uint64_t OFFSET_SHIFT = 16; |
| static const uint64_t IDX_MASK = 0xFFFFFF0000000000; |
| static const uint64_t IDX_SHIFT = 40; |
| |
| uint64_t block() const { |
| return (data & BLOCK_MASK); |
| } |
| |
| uint64_t offset() const { |
| return (data & OFFSET_MASK) >> OFFSET_SHIFT; |
| } |
| |
| uint64_t idx() const { |
| return (data & IDX_MASK) >> IDX_SHIFT; |
| } |
| |
| uint64_t set(uint64_t block, uint64_t offset, uint64_t idx) { |
| DCHECK_LE(block, BLOCK_MASK) |
| << "Cannot have more than 2^16 = 64K blocks in a tuple stream."; |
| DCHECK_LE(offset, OFFSET_MASK >> OFFSET_SHIFT) |
| << "Cannot have blocks larger than 2^24 = 16MB"; |
| DCHECK_LE(idx, IDX_MASK >> IDX_SHIFT) |
| << "Cannot have more than 2^24 = 16M rows in a block."; |
| data = block | (offset << OFFSET_SHIFT) | (idx << IDX_SHIFT); |
| return data; |
| } |
| |
| std::string DebugString() const; |
| |
| uint64_t data; |
| }; |
| |
| /// row_desc: description of rows stored in the stream. This is the desc for rows |
| /// that are added and the rows being returned. |
| /// block_mgr: Underlying block mgr that owns the data blocks. |
| /// use_initial_small_buffers: If true, the initial N buffers allocated for the |
| /// tuple stream use smaller than IO-sized buffers. |
| /// read_write: Stream allows interchanging read and write operations. Requires at |
| /// least two blocks may be pinned. |
| /// ext_varlen_slots: set of varlen slots with data stored externally to the stream |
| BufferedTupleStream(RuntimeState* state, const RowDescriptor& row_desc, |
| BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client, |
| bool use_initial_small_buffers, bool read_write, |
| const std::set<SlotId>& ext_varlen_slots = std::set<SlotId>()); |
| |
| ~BufferedTupleStream(); |
| |
| /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called |
| /// once before any of the other APIs. |
| /// If 'pinned' is true, the tuple stream starts of pinned, otherwise it is unpinned. |
| /// If 'profile' is non-NULL, counters are created. |
| /// 'node_id' is only used for error reporting. |
| Status Init(int node_id, RuntimeProfile* profile, bool pinned); |
| |
| /// Prepares the stream for writing by attempting to allocate a write block. |
| /// Called after Init() and before the first AddRow() call. |
| /// 'got_buffer': set to true if the first write block was successfully pinned, or |
| /// false if the block could not be pinned and no error was encountered. Undefined |
| /// if an error status is returned. |
| Status PrepareForWrite(bool* got_buffer); |
| |
| /// Must be called for streams using small buffers to switch to IO-sized buffers. |
| /// If it fails to get a buffer (i.e. the switch fails) it resets the use_small_buffers_ |
| /// back to false. |
| /// TODO: IMPALA-3200: remove this when small buffers are removed. |
| Status SwitchToIoBuffers(bool* got_buffer); |
| |
| /// Adds a single row to the stream. Returns true if the append succeeded, returns false |
| /// and sets 'status' to OK if appending failed but can be retried or returns false and |
| /// sets 'status' to an error if an error occurred. |
| /// BufferedTupleStream will do a deep copy of the memory in the row. After AddRow() |
| /// returns an error, it should not be called again. If appending failed without an |
| /// error and the stream is using small buffers, it is valid to call |
| /// SwitchToIoBuffers() then AddRow() again. |
| bool AddRow(TupleRow* row, Status* status) noexcept; |
| |
| /// Allocates space to store a row of with fixed length 'fixed_size' and variable |
| /// length data 'varlen_size'. If successful, returns the pointer where fixed length |
| /// data should be stored and assigns 'varlen_data' to where var-len data should |
| /// be stored. Returns NULL if there is not enough memory or an error occurred. |
| /// Sets *status if an error occurred. The returned memory is guaranteed to all |
| /// be allocated in the same block. AllocateRow does not currently support nullable |
| /// tuples. |
| uint8_t* AllocateRow(int fixed_size, int varlen_size, uint8_t** varlen_data, |
| Status* status); |
| |
| /// Populates 'row' with the row at 'idx'. The stream must be pinned. The row must have |
| /// been allocated with the stream's row desc. |
| void GetTupleRow(const RowIdx& idx, TupleRow* row) const; |
| |
| /// Prepares the stream for reading. If read_write_, this can be called at any time to |
| /// begin reading. Otherwise this must be called after the last AddRow() and |
| /// before GetNext(). |
| /// delete_on_read: Blocks are deleted after they are read. |
| /// got_buffer: set to true if the first read block was successfully pinned, or |
| /// false if the block could not be pinned and no error was encountered. |
| Status PrepareForRead(bool delete_on_read, bool* got_buffer); |
| |
| /// Pins all blocks in this stream and switches to pinned mode. |
| /// If there is not enough memory, *pinned is set to false and the stream is unmodified. |
| /// If already_reserved is true, the caller has already made a reservation on |
| /// block_mgr_client_ to pin the stream. |
| Status PinStream(bool already_reserved, bool* pinned); |
| |
| /// Modes for UnpinStream(). |
| enum UnpinMode { |
| /// All blocks in the stream are unpinned and the read/write positions in the stream |
| /// are reset. No more rows can be written to the stream after this. The stream can |
| /// be re-read from the beginning by calling PrepareForRead(). |
| UNPIN_ALL, |
| /// All blocks are unpinned aside from the current read and write blocks (if any), |
| /// which is left in the same state. The unpinned stream can continue being read |
| /// or written from the current read or write positions. |
| UNPIN_ALL_EXCEPT_CURRENT, |
| }; |
| |
| /// Unpins stream with the given 'mode' as described above. |
| Status UnpinStream(UnpinMode mode); |
| |
| /// Get the next batch of output rows. Memory is still owned by the BufferedTupleStream |
| /// and must be copied out by the caller. |
| Status GetNext(RowBatch* batch, bool* eos); |
| |
| /// Same as above, but also populate 'indices' with the index of each returned row. |
| Status GetNext(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices); |
| |
| /// Returns all the rows in the stream in batch. This pins the entire stream in the |
| /// process. |
| /// *got_rows is false if the stream could not be pinned. |
| Status GetRows(boost::scoped_ptr<RowBatch>* batch, bool* got_rows); |
| |
| /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL, |
| /// attaches any pinned blocks to the batch and deletes unpinned blocks. Otherwise |
| /// deletes all blocks. Does nothing if the stream was already closed. The 'flush' |
| /// mode is forwarded to RowBatch::AddBlock() when attaching blocks. |
| void Close(RowBatch* batch, RowBatch::FlushMode flush); |
| |
| /// Number of rows in the stream. |
| int64_t num_rows() const { return num_rows_; } |
| |
| /// Number of rows returned via GetNext(). |
| int64_t rows_returned() const { return rows_returned_; } |
| |
| /// Returns the byte size necessary to store the entire stream in memory. |
| int64_t byte_size() const { return total_byte_size_; } |
| |
| /// Returns the byte size of the stream that is currently pinned in memory. |
| /// If ignore_current is true, the write_block_ memory is not included. |
| int64_t bytes_in_mem(bool ignore_current) const; |
| |
| bool is_closed() const { return closed_; } |
| bool is_pinned() const { return pinned_; } |
| int blocks_pinned() const { return num_pinned_; } |
| int blocks_unpinned() const { return blocks_.size() - num_pinned_ - num_small_blocks_; } |
| bool has_read_block() const { return read_block_ != blocks_.end(); } |
| bool has_write_block() const { return write_block_ != NULL; } |
| bool using_small_buffers() const { return use_small_buffers_; } |
| |
| /// Returns true if the row consumes any memory. If false, the stream only needs to |
| /// store the count of rows. |
| bool RowConsumesMemory() const { |
| return fixed_tuple_row_size_ > 0 || has_nullable_tuple_; |
| } |
| |
| std::string DebugString() const; |
| |
| private: |
| friend class ArrayTupleStreamTest_TestArrayDeepCopy_Test; |
| friend class ArrayTupleStreamTest_TestComputeRowSize_Test; |
| friend class MultiNullableTupleStreamTest_TestComputeRowSize_Test; |
| friend class SimpleTupleStreamTest_TestGetRowsOverflow_Test; |
| |
| /// Runtime state instance used to check for cancellation. Not owned. |
| RuntimeState* const state_; |
| |
| /// Description of rows stored in the stream. |
| const RowDescriptor& desc_; |
| |
| /// Sum of the fixed length portion of all the tuples in desc_. |
| int fixed_tuple_row_size_; |
| |
| /// The size of the fixed length portion for each tuple in the row. |
| std::vector<int> fixed_tuple_sizes_; |
| |
| /// Max size (in bytes) of null indicators bitmap in the current read and write |
| /// blocks. If 0, it means that there is no need to store null indicators for this |
| /// RowDesc. We calculate this value based on the block's size and the |
| /// fixed_tuple_row_size_. When not 0, this value is also an upper bound for the number |
| /// of (rows * tuples_per_row) in this block. |
| int read_block_null_indicators_size_; |
| int write_block_null_indicators_size_; |
| |
| /// Size (in bytes) of the null indicators bitmap reserved in a block of maximum |
| /// size (i.e. IO block size). 0 if no tuple is nullable. |
| int max_null_indicators_size_; |
| |
| /// Vectors of all the strings slots that have their varlen data stored in stream |
| /// grouped by tuple_idx. |
| std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_string_slots_; |
| |
| /// Vectors of all the collection slots that have their varlen data stored in the |
| /// stream, grouped by tuple_idx. |
| std::vector<std::pair<int, std::vector<SlotDescriptor*>>> inlined_coll_slots_; |
| |
| /// Block manager and client used to allocate, pin and release blocks. Not owned. |
| BufferedBlockMgr* block_mgr_; |
| BufferedBlockMgr::Client* block_mgr_client_; |
| |
| /// List of blocks in the stream. |
| std::list<BufferedBlockMgr::Block*> blocks_; |
| |
| /// Total size of blocks_, including small blocks. |
| int64_t total_byte_size_; |
| |
| /// Iterator pointing to the current block for read. Equal to list.end() until |
| /// PrepareForRead() is called. |
| std::list<BufferedBlockMgr::Block*>::iterator read_block_; |
| |
| /// For each block in the stream, the buffer of the start of the block. This is only |
| /// valid when the stream is pinned, giving random access to data in the stream. |
| /// This is not maintained for delete_on_read_. |
| std::vector<uint8_t*> block_start_idx_; |
| |
| /// Current idx of the tuple read from the read_block_ buffer. |
| uint32_t read_tuple_idx_; |
| |
| /// Current offset in read_block_ of the end of the last data read. |
| uint8_t* read_ptr_; |
| |
| /// Pointer to one byte past the end of read_block_. |
| uint8_t* read_end_ptr_; |
| |
| /// Current idx of the tuple written at the write_block_ buffer. |
| uint32_t write_tuple_idx_; |
| |
| /// Pointer into write_block_ of the end of the last data written. |
| uint8_t* write_ptr_; |
| |
| /// Pointer to one byte past the end of write_block_. |
| uint8_t* write_end_ptr_; |
| |
| /// Number of rows returned to the caller from GetNext(). |
| int64_t rows_returned_; |
| |
| /// The block index of the current read block in blocks_. |
| int read_block_idx_; |
| |
| /// The current block for writing. NULL if there is no available block to write to. |
| /// The entire write_block_ buffer is marked as allocated, so any data written into |
| /// the buffer will be spilled without having to allocate additional space. |
| BufferedBlockMgr::Block* write_block_; |
| |
| /// Number of pinned blocks in blocks_, stored to avoid iterating over the list |
| /// to compute bytes_in_mem and bytes_unpinned. |
| /// This does not include small blocks. |
| int num_pinned_; |
| |
| /// The total number of small blocks in blocks_; |
| int num_small_blocks_; |
| |
| /// Number of rows stored in the stream. |
| int64_t num_rows_; |
| |
| /// Counters added by this object to the parent runtime profile. |
| RuntimeProfile::Counter* pin_timer_; |
| RuntimeProfile::Counter* unpin_timer_; |
| RuntimeProfile::Counter* get_new_block_timer_; |
| |
| /// If true, read and write operations may be interleaved. Otherwise all calls |
| /// to AddRow() must occur before calling PrepareForRead() and subsequent calls to |
| /// GetNext(). |
| const bool read_write_; |
| |
| /// Whether any tuple in the rows is nullable. |
| const bool has_nullable_tuple_; |
| |
| /// If true, this stream is still using small buffers. |
| bool use_small_buffers_; |
| |
| /// If true, blocks are deleted after they are read. |
| bool delete_on_read_; |
| |
| bool closed_; // Used for debugging. |
| |
| /// If true, this stream has been explicitly pinned by the caller. This changes the |
| /// memory management of the stream. The blocks are not unpinned until the caller calls |
| /// UnpinAllBlocks(). If false, only the write_block_ and/or read_block_ are pinned |
| /// (both are if read_write_ is true). |
| bool pinned_; |
| |
| /// The slow path for AddRow() that is called if there is not sufficient space in |
| /// the current block. |
| bool AddRowSlow(TupleRow* row, Status* status) noexcept; |
| |
| /// Copies 'row' into write_block_. Returns false if there is not enough space in |
| /// 'write_block_'. After returning false, write_ptr_ may be left pointing to the |
| /// partially-written row, and no more data can be written to write_block_. |
| template <bool HAS_NULLABLE_TUPLE> |
| bool DeepCopyInternal(TupleRow* row) noexcept; |
| |
| /// Helper function to copy strings in string_slots from tuple into write_block_. |
| /// Updates write_ptr_ to the end of the string data added. Returns false if the data |
| /// does not fit in the current write block. After returning false, write_ptr_ is left |
| /// pointing to the partially-written row, and no more data can be written to |
| /// write_block_. |
| bool CopyStrings(const Tuple* tuple, const std::vector<SlotDescriptor*>& string_slots); |
| |
| /// Helper function to deep copy collections in collection_slots from tuple into |
| /// write_block_. Updates write_ptr_ to the end of the collection data added. Returns |
| /// false if the data does not fit in the current write block.. After returning false, |
| /// write_ptr_ is left pointing to the partially-written row, and no more data can be |
| /// written to write_block_. |
| bool CopyCollections(const Tuple* tuple, |
| const std::vector<SlotDescriptor*>& collection_slots); |
| |
| /// Wrapper of the templated DeepCopyInternal() function. |
| bool DeepCopy(TupleRow* row) noexcept; |
| |
| /// Gets a new block of 'block_len' bytes from the block_mgr_, updating write_block_, |
| /// write_tuple_idx_, write_ptr_ and write_end_ptr_. 'null_indicators_size' is the |
| /// number of bytes that will be reserved in the block for the null indicators bitmap. |
| /// *got_block is set to true if a block was successfully acquired. Null indicators |
| /// (if any) will also be reserved and initialized. If there are no blocks available, |
| /// *got_block is set to false and write_block_ is unchanged. |
| Status NewWriteBlock( |
| int64_t block_len, int64_t null_indicators_size, bool* got_block) noexcept; |
| |
| /// A wrapper around NewWriteBlock(). 'row_size' is the size of the tuple row to be |
| /// appended to this block. This function determines the block size required in order |
| /// to fit the row and null indicators. |
| Status NewWriteBlockForRow(int64_t row_size, bool* got_block) noexcept; |
| |
| /// Reads the next block from the block_mgr_. This blocks if necessary. |
| /// Updates read_block_, read_ptr_, read_tuple_idx_ and read_end_ptr_. |
| Status NextReadBlock(); |
| |
| /// Returns the total additional bytes that this row will consume in write_block_ if |
| /// appended to the block. This includes the fixed length part of the row and the |
| /// data for inlined_string_slots_ and inlined_coll_slots_. |
| int64_t ComputeRowSize(TupleRow* row) const noexcept; |
| |
| /// Unpins block if it is an IO-sized block and updates tracking stats. |
| Status UnpinBlock(BufferedBlockMgr::Block* block); |
| |
| /// Templated GetNext implementations. |
| template <bool FILL_INDICES> |
| Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices); |
| template <bool FILL_INDICES, bool HAS_NULLABLE_TUPLE> |
| Status GetNextInternal(RowBatch* batch, bool* eos, std::vector<RowIdx>* indices); |
| |
| /// Helper function for GetNextInternal(). For each string slot in string_slots, |
| /// update StringValue's ptr field to point to the corresponding string data stored |
| /// inline in the stream (at the current value of read_ptr_) advance read_ptr_ by the |
| /// StringValue's length field. |
| void FixUpStringsForRead(const vector<SlotDescriptor*>& string_slots, Tuple* tuple); |
| |
| /// Helper function for GetNextInternal(). For each collection slot in collection_slots, |
| /// recursively update any pointers in the CollectionValue to point to the corresponding |
| /// var len data stored inline in the stream, advancing read_ptr_ as data is read. |
| /// Assumes that the collection was serialized to the stream in DeepCopy()'s format. |
| void FixUpCollectionsForRead(const vector<SlotDescriptor*>& collection_slots, |
| Tuple* tuple); |
| |
| /// Computes the number of bytes needed for null indicators for a block of 'block_size'. |
| /// Return 0 if no tuple is nullable. Return -1 if a single row of fixed-size tuples |
| /// plus its null indicator (if any) cannot fit in the block. |
| int ComputeNumNullIndicatorBytes(int block_size) const; |
| |
| uint32_t read_block_bytes_remaining() const { |
| DCHECK_GE(read_end_ptr_, read_ptr_); |
| DCHECK_LE(read_end_ptr_ - read_ptr_, (*read_block_)->buffer_len()); |
| return read_end_ptr_ - read_ptr_; |
| } |
| |
| uint32_t write_block_bytes_remaining() const { |
| DCHECK_GE(write_end_ptr_, write_ptr_); |
| DCHECK_LE(write_end_ptr_ - write_ptr_, write_block_->buffer_len()); |
| return write_end_ptr_ - write_ptr_; |
| } |
| |
| }; |
| |
| } |
| |
| #endif |