blob: b5c51bb7d3f5732d09b79336e5568fd7b3d93457 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/row-batch.h"
#include <stdint.h> // for intptr_t
#include <memory>
#include <boost/scoped_ptr.hpp>
#include "runtime/exec-env.h"
#include "runtime/mem-tracker.h"
#include "runtime/string-value.h"
#include "runtime/tuple-row.h"
#include "util/compress.h"
#include "util/debug-util.h"
#include "util/decompress.h"
#include "util/fixed-size-hash-table.h"
#include "util/scope-exit-trigger.h"
#include "gen-cpp/Results_types.h"
#include "gen-cpp/row_batch.pb.h"
#include "common/names.h"
namespace impala {
const int RowBatch::AT_CAPACITY_MEM_USAGE;
const int RowBatch::FIXED_LEN_BUFFER_LIMIT;
RowBatch::RowBatch(const RowDescriptor* row_desc, int capacity, MemTracker* mem_tracker)
: num_rows_(0),
capacity_(capacity),
flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
needs_deep_copy_(false),
num_tuples_per_row_(row_desc->tuple_descriptors().size()),
tuple_ptrs_size_(capacity * num_tuples_per_row_ * sizeof(Tuple*)),
attached_buffer_bytes_(0),
tuple_data_pool_(mem_tracker),
row_desc_(row_desc),
mem_tracker_(mem_tracker) {
DCHECK(mem_tracker_ != NULL);
DCHECK_GT(capacity, 0);
DCHECK_GT(tuple_ptrs_size_, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
mem_tracker_->Consume(tuple_ptrs_size_);
tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
DCHECK(tuple_ptrs_ != NULL);
}
// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
// global runtime memory segment; how do we get thrift to allocate it from there?
// maybe change line (in Data_types.cc generated from Data.thrift)
// xfer += iprot->readString(this->tuple_data[_i9]);
// to allocated string data in special mempool
// (change via python script that runs over Data_types.cc)
RowBatch::RowBatch(
const RowDescriptor* row_desc, const TRowBatch& input_batch, MemTracker* mem_tracker)
: num_rows_(input_batch.num_rows),
capacity_(input_batch.num_rows),
flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
needs_deep_copy_(false),
num_tuples_per_row_(input_batch.row_tuples.size()),
tuple_ptrs_size_(capacity_ * num_tuples_per_row_ * sizeof(Tuple*)),
attached_buffer_bytes_(0),
tuple_data_pool_(mem_tracker),
row_desc_(row_desc),
mem_tracker_(mem_tracker) {
DCHECK(mem_tracker_ != nullptr);
DCHECK_EQ(num_tuples_per_row_, row_desc_->tuple_descriptors().size());
DCHECK_GT(tuple_ptrs_size_, 0);
kudu::Slice input_tuple_data =
kudu::Slice(input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
kudu::Slice input_tuple_offsets = kudu::Slice(
reinterpret_cast<const char*>(input_batch.tuple_offsets.data()),
input_batch.tuple_offsets.size() * sizeof(int32_t));
const THdfsCompression::type& compression_type = input_batch.compression_type;
DCHECK(compression_type == THdfsCompression::NONE ||
compression_type == THdfsCompression::LZ4)
<< "Unexpected compression type: " << input_batch.compression_type;
mem_tracker_->Consume(tuple_ptrs_size_);
tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
DCHECK(tuple_ptrs_ != nullptr) << "Failed to allocate tuple pointers";
const uint64_t uncompressed_size = input_batch.uncompressed_size;
uint8_t* tuple_data = tuple_data_pool_.Allocate(uncompressed_size);
DCHECK(tuple_data != nullptr) << "Failed to allocate tuple data";
Deserialize(input_tuple_offsets, input_tuple_data, uncompressed_size,
compression_type == THdfsCompression::LZ4, tuple_data);
}
RowBatch::RowBatch(const RowDescriptor* row_desc, const RowBatchHeaderPB& header,
MemTracker* mem_tracker)
: num_rows_(0),
capacity_(0),
flush_mode_(FlushMode::NO_FLUSH_RESOURCES),
needs_deep_copy_(false),
num_tuples_per_row_(header.num_tuples_per_row()),
tuple_ptrs_size_(header.num_rows() * num_tuples_per_row_ * sizeof(Tuple*)),
attached_buffer_bytes_(0),
tuple_data_pool_(mem_tracker),
row_desc_(row_desc),
mem_tracker_(mem_tracker) {
DCHECK(mem_tracker_ != nullptr);
DCHECK_EQ(num_tuples_per_row_, row_desc_->tuple_descriptors().size());
DCHECK_GT(tuple_ptrs_size_, 0);
}
void RowBatch::Deserialize(const kudu::Slice& input_tuple_offsets,
const kudu::Slice& input_tuple_data, int64_t uncompressed_size,
bool is_compressed, uint8_t* tuple_data) {
DCHECK(tuple_ptrs_ != nullptr);
DCHECK(tuple_data != nullptr);
if (is_compressed) {
// Decompress tuple data into data pool
const uint8_t* compressed_data = input_tuple_data.data();
size_t compressed_size = input_tuple_data.size();
Lz4Decompressor decompressor(nullptr, false);
Status status = decompressor.Init();
DCHECK(status.ok()) << status.GetDetail();
auto compressor_cleanup =
MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); });
status = decompressor.ProcessBlock(
true, compressed_size, compressed_data, &uncompressed_size, &tuple_data);
DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
DCHECK(status.ok()) << "RowBatch decompression failed.";
} else {
// Tuple data uncompressed, copy directly into data pool
DCHECK_EQ(uncompressed_size, input_tuple_data.size());
memcpy(tuple_data, input_tuple_data.data(), input_tuple_data.size());
}
// Convert input_batch.tuple_offsets into pointers
const int32_t* tuple_offsets =
reinterpret_cast<const int32_t*>(input_tuple_offsets.data());
DCHECK_EQ(input_tuple_offsets.size() % sizeof(int32_t), 0);
int num_tuples = input_tuple_offsets.size() / sizeof(int32_t);
for (int tuple_idx = 0; tuple_idx < num_tuples; ++tuple_idx) {
int32_t offset = tuple_offsets[tuple_idx];
if (offset == -1) {
tuple_ptrs_[tuple_idx] = nullptr;
} else {
tuple_ptrs_[tuple_idx] = reinterpret_cast<Tuple*>(tuple_data + offset);
}
}
// Check whether we have slots that require offset-to-pointer conversion.
if (!row_desc_->HasVarlenSlots()) return;
// For every unique tuple, convert string offsets contained in tuple data into
// pointers. Tuples were serialized in the order we are deserializing them in,
// so the first occurrence of a tuple will always have a higher offset than any
// tuple we already converted.
Tuple* last_converted = nullptr;
for (int i = 0; i < num_rows_; ++i) {
for (int j = 0; j < num_tuples_per_row_; ++j) {
const TupleDescriptor* desc = row_desc_->tuple_descriptors()[j];
if (!desc->HasVarlenSlots()) continue;
Tuple* tuple = GetRow(i)->GetTuple(j);
// Handle NULL or already converted tuples with one check.
if (tuple <= last_converted) continue;
last_converted = tuple;
tuple->ConvertOffsetsToPointers(*desc, tuple_data);
}
}
}
Status RowBatch::FromProtobuf(const RowDescriptor* row_desc,
const RowBatchHeaderPB& header, const kudu::Slice& input_tuple_offsets,
const kudu::Slice& input_tuple_data, MemTracker* mem_tracker,
BufferPool::ClientHandle* client, unique_ptr<RowBatch>* row_batch_ptr) {
unique_ptr<RowBatch> row_batch(new RowBatch(row_desc, header, mem_tracker));
DCHECK(client != nullptr);
row_batch->tuple_ptrs_info_.reset(new BufferInfo());
row_batch->tuple_ptrs_info_->client = client;
BufferPool::BufferHandle* tuple_ptrs_buffer = &(row_batch->tuple_ptrs_info_->buffer);
RETURN_IF_ERROR(
row_batch->AllocateBuffer(client, row_batch->tuple_ptrs_size_, tuple_ptrs_buffer));
row_batch->tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_ptrs_buffer->data());
const int64_t uncompressed_size = header.uncompressed_size();
BufferPool::BufferHandle tuple_data_buffer;
RETURN_IF_ERROR(
row_batch->AllocateBuffer(client, uncompressed_size, &tuple_data_buffer));
uint8_t* tuple_data = tuple_data_buffer.data();
row_batch->AddBuffer(client, move(tuple_data_buffer), FlushMode::NO_FLUSH_RESOURCES);
row_batch->num_rows_ = header.num_rows();
row_batch->capacity_ = header.num_rows();
const CompressionType& compression_type = header.compression_type();
DCHECK(compression_type == CompressionType::NONE ||
compression_type == CompressionType::LZ4)
<< "Unexpected compression type: " << compression_type;
row_batch->Deserialize(input_tuple_offsets, input_tuple_data, uncompressed_size,
compression_type == CompressionType::LZ4, tuple_data);
*row_batch_ptr = std::move(row_batch);
return Status::OK();
}
RowBatch::~RowBatch() {
tuple_data_pool_.FreeAll();
FreeBuffers();
if (tuple_ptrs_info_.get() != nullptr) {
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
tuple_ptrs_info_->client, &(tuple_ptrs_info_->buffer));
} else {
DCHECK(tuple_ptrs_ != nullptr);
free(tuple_ptrs_);
mem_tracker_->Release(tuple_ptrs_size_);
}
tuple_ptrs_ = nullptr;
}
Status RowBatch::Serialize(TRowBatch* output_batch) {
return Serialize(output_batch, UseFullDedup());
}
Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) {
// why does Thrift not generate a Clear() function?
output_batch->row_tuples.clear();
output_batch->tuple_offsets.clear();
int64_t uncompressed_size;
bool is_compressed;
RETURN_IF_ERROR(Serialize(full_dedup, &output_batch->tuple_offsets,
&output_batch->tuple_data, &uncompressed_size, &is_compressed));
// TODO: max_size() is much larger than the amount of memory we could feasibly
// allocate. Need better way to detect problem.
DCHECK_LE(uncompressed_size, output_batch->tuple_data.max_size());
output_batch->__set_num_rows(num_rows_);
output_batch->__set_uncompressed_size(uncompressed_size);
output_batch->__set_compression_type(
is_compressed ? THdfsCompression::LZ4 : THdfsCompression::NONE);
row_desc_->ToThrift(&output_batch->row_tuples);
return Status::OK();
}
Status RowBatch::Serialize(OutboundRowBatch* output_batch) {
int64_t uncompressed_size;
bool is_compressed;
output_batch->tuple_offsets_.clear();
RETURN_IF_ERROR(Serialize(UseFullDedup(), &output_batch->tuple_offsets_,
&output_batch->tuple_data_, &uncompressed_size, &is_compressed));
// Initialize the RowBatchHeaderPB
RowBatchHeaderPB* header = &output_batch->header_;
header->Clear();
header->set_num_rows(num_rows_);
header->set_num_tuples_per_row(row_desc_->tuple_descriptors().size());
header->set_uncompressed_size(uncompressed_size);
header->set_compression_type(
is_compressed ? CompressionType::LZ4 : CompressionType::NONE);
return Status::OK();
}
Status RowBatch::Serialize(bool full_dedup, vector<int32_t>* tuple_offsets,
string* tuple_data, int64_t* uncompressed_size, bool* is_compressed) {
// As part of the serialization process we deduplicate tuples to avoid serializing a
// Tuple multiple times for the RowBatch. By default we only detect duplicate tuples
// in adjacent rows only. If full deduplication is enabled, we will build a
// map to detect non-adjacent duplicates. Building this map comes with significant
// overhead, so is only worthwhile in the uncommon case of many non-adjacent duplicates.
int64_t size;
if (full_dedup) {
// Maps from tuple to offset of its serialized data in output_batch->tuple_data.
DedupMap distinct_tuples;
RETURN_IF_ERROR(distinct_tuples.Init(num_rows_ * num_tuples_per_row_ * 2, 0));
size = TotalByteSize(&distinct_tuples);
distinct_tuples.Clear(); // Reuse allocated hash table.
RETURN_IF_ERROR(SerializeInternal(size, &distinct_tuples, tuple_offsets, tuple_data));
} else {
size = TotalByteSize(nullptr);
RETURN_IF_ERROR(SerializeInternal(size, nullptr, tuple_offsets, tuple_data));
}
*uncompressed_size = size;
*is_compressed = false;
if (size > 0) {
// Try compressing tuple_data to compression_scratch_, swap if compressed data is
// smaller
Lz4Compressor compressor(nullptr, false);
RETURN_IF_ERROR(compressor.Init());
auto compressor_cleanup =
MakeScopeExitTrigger([&compressor]() { compressor.Close(); });
// If the input size is too large for LZ4 to compress, MaxOutputLen() will return 0.
int64_t compressed_size = compressor.MaxOutputLen(size);
if (compressed_size == 0) {
return Status(TErrorCode::LZ4_COMPRESSION_INPUT_TOO_LARGE, size);
}
DCHECK_GT(compressed_size, 0);
if (compression_scratch_.size() < compressed_size) {
compression_scratch_.resize(compressed_size);
}
uint8_t* input =
const_cast<uint8_t*>(reinterpret_cast<const uint8_t*>(tuple_data->c_str()));
uint8_t* compressed_output = const_cast<uint8_t*>(
reinterpret_cast<const uint8_t*>(compression_scratch_.c_str()));
RETURN_IF_ERROR(
compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output));
if (LIKELY(compressed_size < size)) {
compression_scratch_.resize(compressed_size);
tuple_data->swap(compression_scratch_);
*is_compressed = true;
}
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
}
return Status::OK();
}
bool RowBatch::UseFullDedup() {
// Switch to using full deduplication in cases where severe size blow-ups are known to
// be common: when a row contains tuples with collections and where there are three or
// more tuples per row so non-adjacent duplicate tuples may have been created when
// joining tuples from multiple sources into a single row.
if (row_desc_->tuple_descriptors().size() < 3) return false;
vector<TupleDescriptor*>::const_iterator tuple_desc =
row_desc_->tuple_descriptors().begin();
for (; tuple_desc != row_desc_->tuple_descriptors().end(); ++tuple_desc) {
if (!(*tuple_desc)->collection_slots().empty()) return true;
}
return false;
}
Status RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
vector<int32_t>* tuple_offsets, string* tuple_data_str) {
DCHECK(distinct_tuples == nullptr || distinct_tuples->size() == 0);
// The maximum uncompressed RowBatch size that can be serialized is INT_MAX. This
// is because the tuple offsets are int32s and will overflow for a larger size.
if (size > numeric_limits<int32_t>::max()) {
return Status(TErrorCode::ROW_BATCH_TOO_LARGE, size, numeric_limits<int32_t>::max());
}
// TODO: track memory usage
// TODO: detect if serialized size is too large to allocate and return proper error.
tuple_data_str->resize(size);
tuple_offsets->reserve(num_rows_ * num_tuples_per_row_);
// Copy tuple data of unique tuples, including strings, into output_batch (converting
// string pointers into offsets in the process).
int offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = const_cast<char*>(tuple_data_str->c_str());
for (int i = 0; i < num_rows_; ++i) {
vector<TupleDescriptor*>::const_iterator desc =
row_desc_->tuple_descriptors().begin();
for (int j = 0; desc != row_desc_->tuple_descriptors().end(); ++desc, ++j) {
Tuple* tuple = GetRow(i)->GetTuple(j);
if (UNLIKELY(tuple == nullptr)) {
// NULLs are encoded as -1
tuple_offsets->push_back(-1);
continue;
} else if (LIKELY(i > 0) && UNLIKELY(GetRow(i - 1)->GetTuple(j) == tuple)) {
// Fast tuple deduplication for adjacent rows.
int prev_row_idx = tuple_offsets->size() - num_tuples_per_row_;
tuple_offsets->push_back((*tuple_offsets)[prev_row_idx]);
continue;
} else if (UNLIKELY(distinct_tuples != nullptr)) {
if ((*desc)->byte_size() == 0) {
// Zero-length tuples can be represented as nullptr.
tuple_offsets->push_back(-1);
continue;
}
int* dedupd_offset = distinct_tuples->FindOrInsert(tuple, offset);
if (*dedupd_offset != offset) {
// Repeat of tuple
DCHECK_GE(*dedupd_offset, 0);
tuple_offsets->push_back(*dedupd_offset);
continue;
}
}
// Record offset before creating copy (which increments offset and tuple_data)
tuple_offsets->push_back(offset);
tuple->DeepCopy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
}
}
DCHECK_EQ(offset, size);
return Status::OK();
}
Status RowBatch::AllocateBuffer(BufferPool::ClientHandle* client, int64_t len,
BufferPool::BufferHandle* buffer_handle) {
BufferPool* buffer_pool = ExecEnv::GetInstance()->buffer_pool();
int64_t buffer_len = BitUtil::RoundUpToPowerOfTwo(len);
buffer_len = max(buffer_pool->min_buffer_len(), buffer_len);
RETURN_IF_ERROR(
buffer_pool->AllocateUnreservedBuffer(client, buffer_len, buffer_handle));
if (UNLIKELY(!buffer_handle->is_open())) {
return mem_tracker_->MemLimitExceeded(
nullptr, "Failed to allocate row batch", buffer_len);
}
return Status::OK();
}
void RowBatch::AddBuffer(BufferPool::ClientHandle* client,
BufferPool::BufferHandle&& buffer, FlushMode flush) {
attached_buffer_bytes_ += buffer.len();
BufferInfo buffer_info;
buffer_info.client = client;
buffer_info.buffer = std::move(buffer);
buffers_.push_back(std::move(buffer_info));
if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
}
void RowBatch::FreeBuffers() {
for (BufferInfo& buffer_info : buffers_) {
ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
buffer_info.client, &buffer_info.buffer);
}
buffers_.clear();
}
void RowBatch::Reset() {
num_rows_ = 0;
capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
// TODO: Change this to Clear() and investigate the repercussions.
tuple_data_pool_.FreeAll();
FreeBuffers();
attached_buffer_bytes_ = 0;
flush_mode_ = FlushMode::NO_FLUSH_RESOURCES;
needs_deep_copy_ = false;
}
void RowBatch::TransferResourceOwnership(RowBatch* dest) {
dest->tuple_data_pool_.AcquireData(&tuple_data_pool_, false);
for (BufferInfo& buffer_info : buffers_) {
dest->AddBuffer(
buffer_info.client, std::move(buffer_info.buffer), FlushMode::NO_FLUSH_RESOURCES);
}
buffers_.clear();
if (needs_deep_copy_) {
dest->MarkNeedsDeepCopy();
} else if (flush_mode_ == FlushMode::FLUSH_RESOURCES) {
dest->MarkFlushResources();
}
Reset();
}
void RowBatch::SetMemTracker(MemTracker* new_tracker) {
tuple_data_pool_.SetMemTracker(new_tracker);
mem_tracker_->TransferTo(new_tracker, tuple_ptrs_size_);
mem_tracker_ = new_tracker;
}
int64_t RowBatch::GetDeserializedSize(const TRowBatch& batch) {
return batch.uncompressed_size + batch.tuple_offsets.size() * sizeof(Tuple*);
}
int64_t RowBatch::GetSerializedSize(const TRowBatch& batch) {
int64_t result = batch.tuple_data.size();
result += batch.row_tuples.size() * sizeof(TTupleId);
result += batch.tuple_offsets.size() * sizeof(int32_t);
return result;
}
int64_t RowBatch::GetDeserializedSize(const RowBatchHeaderPB& header,
const kudu::Slice& tuple_offsets) {
DCHECK_EQ(tuple_offsets.size() % sizeof(int32_t), 0);
return header.uncompressed_size() +
(tuple_offsets.size() / sizeof(int32_t)) * sizeof(Tuple*);
}
int64_t RowBatch::GetDeserializedSize(const OutboundRowBatch& batch) {
return batch.header_.uncompressed_size() + batch.tuple_offsets_.size() * sizeof(Tuple*);
}
int64_t RowBatch::GetSerializedSize(const OutboundRowBatch& batch) {
return batch.tuple_data_.size() + batch.tuple_offsets_.size() * sizeof(int32_t);
}
void RowBatch::AcquireState(RowBatch* src) {
DCHECK(row_desc_->LayoutEquals(*src->row_desc_));
DCHECK_EQ(num_tuples_per_row_, src->num_tuples_per_row_);
DCHECK_EQ(tuple_ptrs_size_, src->tuple_ptrs_size_);
// The destination row batch should be empty.
DCHECK(!needs_deep_copy_);
DCHECK_EQ(num_rows_, 0);
DCHECK_EQ(attached_buffer_bytes_, 0);
num_rows_ = src->num_rows_;
capacity_ = src->capacity_;
// tuple_ptrs_ were allocated with malloc so can be swapped between batches.
DCHECK(tuple_ptrs_info_.get() == nullptr);
std::swap(tuple_ptrs_, src->tuple_ptrs_);
src->TransferResourceOwnership(this);
}
void RowBatch::DeepCopyTo(RowBatch* dst) {
DCHECK(dst->row_desc_->Equals(*row_desc_));
DCHECK_EQ(dst->num_rows_, 0);
DCHECK_GE(dst->capacity_, num_rows_);
dst->AddRows(num_rows_);
for (int i = 0; i < num_rows_; ++i) {
TupleRow* src_row = GetRow(i);
TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->tuple_ptrs_ +
i * num_tuples_per_row_);
src_row->DeepCopy(
dst_row, row_desc_->tuple_descriptors(), &dst->tuple_data_pool_, false);
}
dst->CommitRows(num_rows_);
}
// TODO: consider computing size of batches as they are built up
int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) {
DCHECK(distinct_tuples == nullptr || distinct_tuples->size() == 0);
int64_t result = 0;
vector<int> tuple_count(row_desc_->tuple_descriptors().size(), 0);
// Sum total variable length byte sizes.
for (int i = 0; i < num_rows_; ++i) {
for (int j = 0; j < num_tuples_per_row_; ++j) {
Tuple* tuple = GetRow(i)->GetTuple(j);
if (UNLIKELY(tuple == nullptr)) continue;
// Only count the data of unique tuples.
if (LIKELY(i > 0) && UNLIKELY(GetRow(i - 1)->GetTuple(j) == tuple)) {
// Fast tuple deduplication for adjacent rows.
continue;
} else if (UNLIKELY(distinct_tuples != nullptr)) {
if (row_desc_->tuple_descriptors()[j]->byte_size() == 0) continue;
bool inserted = distinct_tuples->InsertIfNotPresent(tuple, -1);
if (!inserted) continue;
}
result += tuple->VarlenByteSize(*row_desc_->tuple_descriptors()[j]);
++tuple_count[j];
}
}
// Compute sum of fixed component of tuple sizes.
for (int j = 0; j < num_tuples_per_row_; ++j) {
result += row_desc_->tuple_descriptors()[j]->byte_size() * tuple_count[j];
}
return result;
}
Status RowBatch::ResizeAndAllocateTupleBuffer(
RuntimeState* state, int64_t* buffer_size, uint8_t** buffer) {
return ResizeAndAllocateTupleBuffer(
state, &tuple_data_pool_, row_desc_->GetRowSize(), &capacity_, buffer_size, buffer);
}
Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state, MemPool* pool,
int row_size, int* capacity, int64_t* buffer_size, uint8_t** buffer) {
// Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
if (row_size != 0) {
*capacity = max(1, min(*capacity, FIXED_LEN_BUFFER_LIMIT / row_size));
}
*buffer_size = static_cast<int64_t>(row_size) * *capacity;
*buffer = pool->TryAllocate(*buffer_size);
if (*buffer == nullptr) {
return pool->mem_tracker()->MemLimitExceeded(
state, "Failed to allocate tuple buffer", *buffer_size);
}
return Status::OK();
}
void RowBatch::VLogRows(const string& context) {
if (!VLOG_ROW_IS_ON) return;
VLOG_ROW << context << ": #rows=" << num_rows_;
for (int i = 0; i < num_rows_; ++i) {
VLOG_ROW << PrintRow(GetRow(i), *row_desc_);
}
}
}