blob: 91433c4a82d602add2d29cbcb7756cce29f78287 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef IMPALA_RUNTIME_ROW_BATCH_H
#define IMPALA_RUNTIME_ROW_BATCH_H
#include <vector>
#include <cstring>
#include <boost/scoped_ptr.hpp>
#include "codegen/impala-ir.h"
#include "common/compiler-util.h"
#include "common/logging.h"
#include "runtime/buffered-block-mgr.h" // for BufferedBlockMgr::Block
#include "runtime/descriptors.h"
#include "runtime/disk-io-mgr.h"
#include "runtime/mem-pool.h"
namespace impala {
template <typename K, typename V> class FixedSizeHashTable;
class MemTracker;
class RowBatchSerializeTest;
class RuntimeState;
class TRowBatch;
class Tuple;
class TupleRow;
class TupleDescriptor;
/// A RowBatch encapsulates a batch of rows, each composed of a number of tuples.
/// The maximum number of rows is fixed at the time of construction.
/// The row batch reference a few different sources of memory.
/// 1. TupleRow ptrs - may be malloc'd and owned by the RowBatch or allocated from
/// the tuple pool, depending on whether legacy joins and aggs are enabled.
/// See the comment on tuple_ptrs_ for more details.
/// 2. Tuple memory - this is allocated (or transferred to) the row batches tuple pool.
/// 3. Auxiliary tuple memory (e.g. string data) - this can either be stored externally
/// (don't copy strings) or from the tuple pool (strings are copied). If external,
/// the data is in an io buffer that may not be attached to this row batch. The
/// creator of that row batch has to make sure that the io buffer is not recycled
/// until all batches that reference the memory have been consumed.
/// In order to minimize memory allocations, RowBatches and TRowBatches that have been
/// serialized and sent over the wire should be reused (this prevents compression_scratch_
/// from being needlessly reallocated).
//
/// Row batches and memory usage: We attempt to stream row batches through the plan
/// tree without copying the data. This means that row batches are often not-compact
/// and reference memory outside of the row batch. This results in most row batches
/// having a very small memory footprint and in some row batches having a very large
/// one (it contains all the memory that other row batches are referencing). An example
/// is IoBuffers which are only attached to one row batch. Only when the row batch reaches
/// a blocking operator or the root of the fragment is the row batch memory freed.
/// This means that in some cases (e.g. very selective queries), we still need to
/// pass the row batch through the exec nodes (even if they have no rows) to trigger
/// memory deletion. AtCapacity() encapsulates the check that we are not accumulating
/// excessive memory.
//
/// A row batch is considered at capacity if all the rows are full or it has accumulated
/// auxiliary memory up to a soft cap. (See at_capacity_mem_usage_ comment).
class RowBatch {
public:
/// Flag indicating whether the resources attached to a RowBatch need to be flushed.
/// Defined here as a convenience for other modules that need to communicate flushing
/// modes.
enum class FlushMode {
FLUSH_RESOURCES,
NO_FLUSH_RESOURCES,
};
/// Create RowBatch for a maximum of 'capacity' rows of tuples specified
/// by 'row_desc'.
/// tracker cannot be NULL.
RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* tracker);
/// Populate a row batch from input_batch by copying input_batch's
/// tuple_data into the row batch's mempool and converting all offsets
/// in the data back into pointers.
/// TODO: figure out how to transfer the data from input_batch to this RowBatch
/// (so that we don't need to make yet another copy)
RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
MemTracker* tracker);
/// Releases all resources accumulated at this row batch. This includes
/// - tuple_ptrs
/// - tuple mem pool data
/// - buffer handles from the io mgr
~RowBatch();
/// AddRows() is called before adding rows to the batch. Returns the index of the next
/// row to be added. The caller is responsible for ensuring there is enough remaining
/// capacity: it is invalid to call AddRows when num_rows_ + n > capacity_. The rows
/// are uninitialized and each tuple of the row must be set, after which CommitRows()
/// can be called to update num_rows_. Two consecutive AddRow() calls without a
/// CommitRows() call between them have the same effect as a single call.
int ALWAYS_INLINE AddRows(int n) {
DCHECK_LE(num_rows_ + n, capacity_);
return num_rows_;
}
int ALWAYS_INLINE AddRow() { return AddRows(1); }
void ALWAYS_INLINE CommitRows(int n) {
DCHECK_GE(n, 0);
DCHECK_LE(num_rows_ + n, capacity_);
num_rows_ += n;
}
void ALWAYS_INLINE CommitLastRow() { CommitRows(1); }
/// Set function can be used to reduce the number of rows in the batch. This is only
/// used in the limit case where more rows were added than necessary.
void set_num_rows(int num_rows) {
DCHECK_LE(num_rows, num_rows_);
DCHECK_GE(num_rows, 0);
num_rows_ = num_rows;
}
/// Returns true if the row batch has filled rows up to its capacity or has accumulated
/// enough memory. The memory calculation includes the tuple data pool and any
/// auxiliary memory attached to the row batch.
bool ALWAYS_INLINE AtCapacity() {
DCHECK_LE(num_rows_, capacity_);
// Check AtCapacity() condition enforced in MarkNeedsDeepCopy() and
// MarkFlushResources().
DCHECK((!needs_deep_copy_ && flush_ == FlushMode::NO_FLUSH_RESOURCES)
|| num_rows_ == capacity_);
int64_t mem_usage = auxiliary_mem_usage_ + tuple_data_pool_.total_allocated_bytes();
return num_rows_ == capacity_ || mem_usage >= AT_CAPACITY_MEM_USAGE;
}
TupleRow* ALWAYS_INLINE GetRow(int row_idx) {
DCHECK(tuple_ptrs_ != NULL);
DCHECK_GE(row_idx, 0);
DCHECK_LT(row_idx, capacity_);
return reinterpret_cast<TupleRow*>(tuple_ptrs_ + row_idx * num_tuples_per_row_);
}
/// An iterator for going through a row batch, starting at 'row_idx'.
/// If 'limit' is specified, it will iterate up to row number 'row_idx + limit'
/// or the last row, whichever comes first. Otherwise, it will iterate till the last
/// row in the batch. This is more efficient than using GetRow() as it avoids loading
/// the row batch state and doing multiplication on each loop with GetRow().
class Iterator {
public:
Iterator(RowBatch* parent, int row_idx, int limit = -1) :
num_tuples_per_row_(parent->num_tuples_per_row_),
row_(parent->tuple_ptrs_ + num_tuples_per_row_ * row_idx),
row_batch_end_(parent->tuple_ptrs_ + num_tuples_per_row_ *
(limit == -1 ? parent->num_rows_ :
std::min<int>(row_idx + limit, parent->num_rows_))),
parent_(parent) {
DCHECK_GE(row_idx, 0);
DCHECK_GT(num_tuples_per_row_, 0);
/// We allow empty row batches with num_rows_ == capacity_ == 0.
/// That's why we cannot call GetRow() above to initialize 'row_'.
DCHECK_LE(row_idx, parent->capacity_);
}
/// Return the current row pointed to by the row pointer.
TupleRow* IR_ALWAYS_INLINE Get() { return reinterpret_cast<TupleRow*>(row_); }
/// Increment the row pointer and return the next row.
TupleRow* IR_ALWAYS_INLINE Next() {
row_ += num_tuples_per_row_;
DCHECK_LE((row_ - parent_->tuple_ptrs_) / num_tuples_per_row_, parent_->capacity_);
return Get();
}
/// Returns true if the iterator is beyond the last row for read iterators.
/// Useful for read iterators to determine the limit. Write iterators should use
/// RowBatch::AtCapacity() instead.
bool IR_ALWAYS_INLINE AtEnd() { return row_ >= row_batch_end_; }
/// Returns the row batch which this iterator is iterating through.
RowBatch* parent() { return parent_; }
private:
/// Number of tuples per row.
const int num_tuples_per_row_;
/// Pointer to the current row.
Tuple** row_;
/// Pointer to the row after the last row for read iterators.
Tuple** const row_batch_end_;
/// The row batch being iterated on.
RowBatch* const parent_;
};
int num_tuples_per_row() { return num_tuples_per_row_; }
int row_byte_size() { return num_tuples_per_row_ * sizeof(Tuple*); }
MemPool* tuple_data_pool() { return &tuple_data_pool_; }
int num_io_buffers() const { return io_buffers_.size(); }
int num_blocks() const { return blocks_.size(); }
/// Resets the row batch, returning all resources it has accumulated.
void Reset();
/// Add io buffer to this row batch.
void AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer);
/// Adds a block to this row batch. The block must be pinned. The blocks must be
/// deleted when freeing resources. The block's memory remains accounted against
/// the original owner, even when the ownership of batches is transferred. If the
/// original owner wants the memory to be released, it should call this with 'mode'
/// FLUSH_RESOURCES (see MarkFlushResources() for further explanation).
/// TODO: after IMPALA-3200, make the ownership transfer model consistent between
/// Blocks and I/O buffers.
void AddBlock(BufferedBlockMgr::Block* block, FlushMode flush);
/// Used by an operator to indicate that it cannot produce more rows until the
/// resources that it has attached to the row batch are freed or acquired by an
/// ancestor operator. After this is called, the batch is at capacity and no more rows
/// can be added. The "flush" mark is transferred by TransferResourceOwnership(). This
/// ensures that batches are flushed by streaming operators all the way up the operator
/// tree. Blocking operators can still accumulate batches with this flag.
/// TODO: IMPALA-3200: blocking operators should acquire all memory resources including
/// attached blocks/buffers, so that MarkFlushResources() can guarantee that the
/// resources will not be accounted against the original operator (this is currently
/// not true for Blocks, which can't be transferred).
void MarkFlushResources() {
DCHECK_LE(num_rows_, capacity_);
capacity_ = num_rows_;
flush_ = FlushMode::FLUSH_RESOURCES;
}
/// Called to indicate that some resources backing this batch were not attached and
/// will be cleaned up after the next GetNext() call. This means that the batch must
/// be returned up the operator tree. Blocking operators must deep-copy any rows from
/// this batch or preceding batches.
///
/// This is a stronger version of MarkFlushResources(), because blocking operators
/// are not allowed to accumulate batches with the 'needs_deep_copy' flag.
/// TODO: IMPALA-4179: always attach backing resources and remove this flag.
void MarkNeedsDeepCopy() {
MarkFlushResources(); // No more rows should be added to the batch.
needs_deep_copy_ = true;
}
bool needs_deep_copy() { return needs_deep_copy_; }
/// Transfer ownership of resources to dest. This includes tuple data in mem
/// pool and io buffers.
void TransferResourceOwnership(RowBatch* dest);
void CopyRow(TupleRow* src, TupleRow* dest) {
memcpy(dest, src, num_tuples_per_row_ * sizeof(Tuple*));
}
/// Copy 'num_rows' rows from 'src' to 'dest' within the batch. Useful for exec
/// nodes that skip an offset and copied more than necessary.
void CopyRows(int dest, int src, int num_rows) {
DCHECK_LE(dest, src);
DCHECK_LE(src + num_rows, capacity_);
memmove(tuple_ptrs_ + num_tuples_per_row_ * dest,
tuple_ptrs_ + num_tuples_per_row_ * src,
num_rows * num_tuples_per_row_ * sizeof(Tuple*));
}
void ClearRow(TupleRow* row) {
memset(row, 0, num_tuples_per_row_ * sizeof(Tuple*));
}
/// Acquires state from the 'src' row batch into this row batch. This includes all IO
/// buffers and tuple data.
/// This row batch must be empty and have the same row descriptor as the src batch.
/// This is used for scan nodes which produce RowBatches asynchronously. Typically,
/// an ExecNode is handed a row batch to populate (pull model) but ScanNodes have
/// multiple threads which push row batches.
void AcquireState(RowBatch* src);
/// Deep copy all rows this row batch into dst, using memory allocated from
/// dst's tuple_data_pool_. Only valid when dst is empty.
/// TODO: the current implementation of deep copy can produce an oversized
/// row batch if there are duplicate tuples in this row batch.
void DeepCopyTo(RowBatch* dst);
/// Create a serialized version of this row batch in output_batch, attaching all of the
/// data it references to output_batch.tuple_data. This function attempts to
/// detect duplicate tuples in the row batch to reduce the serialized size.
/// output_batch.tuple_data will be snappy-compressed unless the compressed data is
/// larger than the uncompressed data. Use output_batch.is_compressed to determine
/// whether tuple_data is compressed. If an in-flight row is present in this row batch,
/// it is ignored. This function does not Reset().
Status Serialize(TRowBatch* output_batch);
/// Utility function: returns total size of batch.
static int GetBatchSize(const TRowBatch& batch);
int ALWAYS_INLINE num_rows() const { return num_rows_; }
int ALWAYS_INLINE capacity() const { return capacity_; }
// The maximum value that capacity_ ever took, before MarkCapacity() might have changed
// it.
int ALWAYS_INLINE InitialCapacity() const {
return tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
}
const RowDescriptor& row_desc() const { return row_desc_; }
/// Max memory that this row batch can accumulate before it is considered at capacity.
/// This is a soft capacity: row batches may exceed the capacity, preferably only by a
/// row's worth of data.
static const int AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024;
// Max memory out of AT_CAPACITY_MEM_USAGE that should be used for fixed-length data,
// in order to leave room for variable-length data.
static const int FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
/// Allocates a buffer large enough for the fixed-length portion of 'capacity_' rows in
/// this batch from 'tuple_data_pool_'. 'capacity_' is reduced if the allocation would
/// exceed FIXED_LEN_BUFFER_LIMIT. Always returns enough space for at least one row.
/// Returns Status::MEM_LIMIT_EXCEEDED and sets 'buffer' to NULL if a memory limit would
/// have been exceeded. 'state' is used to log the error.
/// On success, sets 'buffer_size' to the size in bytes and 'buffer' to the buffer.
Status ResizeAndAllocateTupleBuffer(RuntimeState* state, int64_t* buffer_size,
uint8_t** buffer);
/// Helper function to log the batch's rows if VLOG_ROW is enabled. 'context' is a
/// string to prepend to the log message.
void VLogRows(const std::string& context);
private:
friend class RowBatchSerializeBaseline;
friend class RowBatchSerializeBenchmark;
friend class RowBatchSerializeTest;
/// Decide whether to do full tuple deduplication based on row composition. Full
/// deduplication is enabled only when there is risk of the serialized size being
/// much larger than in-memory size due to non-adjacent duplicate tuples.
bool UseFullDedup();
/// Overload for testing that allows the test to force the deduplication level.
Status Serialize(TRowBatch* output_batch, bool full_dedup);
typedef FixedSizeHashTable<Tuple*, int> DedupMap;
/// The total size of all data represented in this row batch (tuples and referenced
/// string and collection data). This is the size of the row batch after removing all
/// gaps in the auxiliary and deduplicated tuples (i.e. the smallest footprint for the
/// row batch). If the distinct_tuples argument is non-null, full deduplication is
/// enabled. The distinct_tuples map must be empty.
int64_t TotalByteSize(DedupMap* distinct_tuples);
void SerializeInternal(int64_t size, DedupMap* distinct_tuples,
TRowBatch* output_batch);
/// All members below need to be handled in RowBatch::AcquireState()
// Class members that are accessed on performance-critical paths should appear
// up the top to fit in as few cache lines as possible.
int num_rows_; // # of committed rows
int capacity_; // the value of num_rows_ at which batch is considered full.
/// If FLUSH_RESOURCES, the resources attached to this batch should be freed or
/// acquired by a new owner as soon as possible. See MarkFlushResources(). If
/// FLUSH_RESOURCES, AtCapacity() is also true.
FlushMode flush_;
/// If true, this batch references unowned memory that will be cleaned up soon.
/// See MarkNeedsDeepCopy(). If true, 'flush_' is FLUSH_RESOURCES and
/// AtCapacity() is true.
bool needs_deep_copy_;
const int num_tuples_per_row_;
/// Array of pointers with InitialCapacity() * num_tuples_per_row_ elements.
/// The memory ownership depends on whether legacy joins and aggs are enabled.
///
/// Memory is malloc'd and owned by RowBatch:
/// If enable_partitioned_hash_join=true and enable_partitioned_aggregation=true
/// then the memory is owned by this RowBatch and is freed upon its destruction.
/// This mode is more performant especially with SubplanNodes in the ExecNode tree
/// because the tuple pointers are not transferred and do not have to be re-created
/// in every Reset().
///
/// Memory is allocated from MemPool:
/// Otherwise, the memory is allocated from tuple_data_pool_. As a result, the
/// pointer memory is transferred just like tuple data, and must be re-created
/// in Reset(). This mode is required for the legacy join and agg which rely on
/// the tuple pointers being allocated from the tuple_data_pool_, so they can
/// acquire ownership of the tuple pointers.
int tuple_ptrs_size_;
Tuple** tuple_ptrs_;
/// Sum of all auxiliary bytes. This includes IoBuffers and memory from
/// TransferResourceOwnership().
int64_t auxiliary_mem_usage_;
/// holding (some of the) data referenced by rows
MemPool tuple_data_pool_;
// Less frequently used members that are not accessed on performance-critical paths
// should go below here.
/// Full row descriptor for rows in this batch.
RowDescriptor row_desc_;
MemTracker* mem_tracker_; // not owned
/// IO buffers current owned by this row batch. Ownership of IO buffers transfer
/// between row batches. Any IO buffer will be owned by at most one row batch
/// (i.e. they are not ref counted) so most row batches don't own any.
std::vector<DiskIoMgr::BufferDescriptor*> io_buffers_;
/// Blocks attached to this row batch. The underlying memory and block manager client
/// are owned by the BufferedBlockMgr.
std::vector<BufferedBlockMgr::Block*> blocks_;
/// String to write compressed tuple data to in Serialize().
/// This is a string so we can swap() with the string in the TRowBatch we're serializing
/// to (we don't compress directly into the TRowBatch in case the compressed data is
/// longer than the uncompressed data). Swapping avoids copying data to the TRowBatch and
/// avoids excess memory allocations: since we reuse RowBatchs and TRowBatchs, and
/// assuming all row batches are roughly the same size, all strings will eventually be
/// allocated to the right size.
std::string compression_scratch_;
};
}
/// Macros for iterating through '_row_batch', starting at '_start_row_idx'.
/// '_row_batch' is the row batch to iterate through.
/// '_start_row_idx' is the starting row index.
/// '_iter' is the iterator.
/// '_limit' is the max number of rows to iterate over.
#define FOREACH_ROW(_row_batch, _start_row_idx, _iter) \
for (RowBatch::Iterator _iter(_row_batch, _start_row_idx); \
!_iter.AtEnd(); _iter.Next())
#define FOREACH_ROW_LIMIT(_row_batch, _start_row_idx, _limit, _iter) \
for (RowBatch::Iterator _iter(_row_batch, _start_row_idx, _limit); \
!_iter.AtEnd(); _iter.Next())
#endif