blob: 6daa02f11830b7a5ce1460d014c847f951edb80f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/row-batch.h"
#include <stdint.h> // for intptr_t
#include <boost/scoped_ptr.hpp>
#include "runtime/mem-tracker.h"
#include "runtime/string-value.h"
#include "runtime/tuple-row.h"
#include "util/compress.h"
#include "util/decompress.h"
#include "util/debug-util.h"
#include "util/fixed-size-hash-table.h"
#include "gen-cpp/Results_types.h"
#include "common/names.h"
// Used to determine memory ownership of a RowBatch's tuple pointers.
DECLARE_bool(enable_partitioned_hash_join);
DECLARE_bool(enable_partitioned_aggregation);
namespace impala {
const int RowBatch::AT_CAPACITY_MEM_USAGE;
const int RowBatch::FIXED_LEN_BUFFER_LIMIT;
RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity, MemTracker* mem_tracker)
: num_rows_(0),
capacity_(capacity),
flush_(FlushMode::NO_FLUSH_RESOURCES),
needs_deep_copy_(false),
num_tuples_per_row_(row_desc.tuple_descriptors().size()),
auxiliary_mem_usage_(0),
tuple_data_pool_(mem_tracker),
row_desc_(row_desc),
mem_tracker_(mem_tracker) {
DCHECK(mem_tracker_ != NULL);
DCHECK_GT(capacity, 0);
tuple_ptrs_size_ = capacity * num_tuples_per_row_ * sizeof(Tuple*);
DCHECK_GT(tuple_ptrs_size_, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) {
mem_tracker_->Consume(tuple_ptrs_size_);
tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
DCHECK(tuple_ptrs_ != NULL);
} else {
tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
}
}
// TODO: we want our input_batch's tuple_data to come from our (not yet implemented)
// global runtime memory segment; how do we get thrift to allocate it from there?
// maybe change line (in Data_types.cc generated from Data.thrift)
// xfer += iprot->readString(this->tuple_data[_i9]);
// to allocated string data in special mempool
// (change via python script that runs over Data_types.cc)
RowBatch::RowBatch(
const RowDescriptor& row_desc, const TRowBatch& input_batch, MemTracker* mem_tracker)
: num_rows_(input_batch.num_rows),
capacity_(input_batch.num_rows),
flush_(FlushMode::NO_FLUSH_RESOURCES),
needs_deep_copy_(false),
num_tuples_per_row_(input_batch.row_tuples.size()),
auxiliary_mem_usage_(0),
tuple_data_pool_(mem_tracker),
row_desc_(row_desc),
mem_tracker_(mem_tracker) {
DCHECK(mem_tracker_ != NULL);
tuple_ptrs_size_ = num_rows_ * input_batch.row_tuples.size() * sizeof(Tuple*);
DCHECK_EQ(input_batch.row_tuples.size(), row_desc.tuple_descriptors().size());
DCHECK_GT(tuple_ptrs_size_, 0);
// TODO: switch to Init() pattern so we can check memory limit and return Status.
if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) {
mem_tracker_->Consume(tuple_ptrs_size_);
tuple_ptrs_ = reinterpret_cast<Tuple**>(malloc(tuple_ptrs_size_));
DCHECK(tuple_ptrs_ != NULL);
} else {
tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
}
uint8_t* tuple_data;
if (input_batch.compression_type != THdfsCompression::NONE) {
// Decompress tuple data into data pool
uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
size_t compressed_size = input_batch.tuple_data.size();
scoped_ptr<Codec> decompressor;
Status status = Codec::CreateDecompressor(NULL, false, input_batch.compression_type,
&decompressor);
DCHECK(status.ok()) << status.GetDetail();
int64_t uncompressed_size = input_batch.uncompressed_size;
DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
tuple_data = tuple_data_pool_.Allocate(uncompressed_size);
status = decompressor->ProcessBlock(true, compressed_size, compressed_data,
&uncompressed_size, &tuple_data);
DCHECK(status.ok()) << "RowBatch decompression failed.";
decompressor->Close();
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = tuple_data_pool_.Allocate(input_batch.tuple_data.size());
memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
}
// Convert input_batch.tuple_offsets into pointers
int tuple_idx = 0;
for (vector<int32_t>::const_iterator offset = input_batch.tuple_offsets.begin();
offset != input_batch.tuple_offsets.end(); ++offset) {
if (*offset == -1) {
tuple_ptrs_[tuple_idx++] = NULL;
} else {
tuple_ptrs_[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + *offset);
}
}
// Check whether we have slots that require offset-to-pointer conversion.
if (!row_desc_.HasVarlenSlots()) return;
// For every unique tuple, convert string offsets contained in tuple data into
// pointers. Tuples were serialized in the order we are deserializing them in,
// so the first occurrence of a tuple will always have a higher offset than any tuple
// we already converted.
Tuple* last_converted = NULL;
for (int i = 0; i < num_rows_; ++i) {
for (int j = 0; j < num_tuples_per_row_; ++j) {
const TupleDescriptor* desc = row_desc_.tuple_descriptors()[j];
if (!desc->HasVarlenSlots()) continue;
Tuple* tuple = GetRow(i)->GetTuple(j);
// Handle NULL or already converted tuples with one check.
if (tuple <= last_converted) continue;
last_converted = tuple;
tuple->ConvertOffsetsToPointers(*desc, tuple_data);
}
}
}
RowBatch::~RowBatch() {
tuple_data_pool_.FreeAll();
for (int i = 0; i < io_buffers_.size(); ++i) {
io_buffers_[i]->Return();
}
for (int i = 0; i < blocks_.size(); ++i) {
blocks_[i]->Delete();
}
if (FLAGS_enable_partitioned_aggregation && FLAGS_enable_partitioned_hash_join) {
DCHECK(tuple_ptrs_ != NULL);
free(tuple_ptrs_);
mem_tracker_->Release(tuple_ptrs_size_);
tuple_ptrs_ = NULL;
}
}
Status RowBatch::Serialize(TRowBatch* output_batch) {
return Serialize(output_batch, UseFullDedup());
}
Status RowBatch::Serialize(TRowBatch* output_batch, bool full_dedup) {
// why does Thrift not generate a Clear() function?
output_batch->row_tuples.clear();
output_batch->tuple_offsets.clear();
output_batch->compression_type = THdfsCompression::NONE;
output_batch->num_rows = num_rows_;
row_desc_.ToThrift(&output_batch->row_tuples);
// As part of the serialization process we deduplicate tuples to avoid serializing a
// Tuple multiple times for the RowBatch. By default we only detect duplicate tuples
// in adjacent rows only. If full deduplication is enabled, we will build a
// map to detect non-adjacent duplicates. Building this map comes with significant
// overhead, so is only worthwhile in the uncommon case of many non-adjacent duplicates.
int64_t size;
if (full_dedup) {
// Maps from tuple to offset of its serialized data in output_batch->tuple_data.
DedupMap distinct_tuples;
RETURN_IF_ERROR(distinct_tuples.Init(num_rows_ * num_tuples_per_row_ * 2, 0));
size = TotalByteSize(&distinct_tuples);
distinct_tuples.Clear(); // Reuse allocated hash table.
SerializeInternal(size, &distinct_tuples, output_batch);
} else {
size = TotalByteSize(NULL);
SerializeInternal(size, NULL, output_batch);
}
if (size > 0) {
// Try compressing tuple_data to compression_scratch_, swap if compressed data is
// smaller
scoped_ptr<Codec> compressor;
RETURN_IF_ERROR(Codec::CreateCompressor(NULL, false, THdfsCompression::LZ4,
&compressor));
int64_t compressed_size = compressor->MaxOutputLen(size);
if (compression_scratch_.size() < compressed_size) {
compression_scratch_.resize(compressed_size);
}
uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
uint8_t* compressed_output = (uint8_t*)compression_scratch_.c_str();
RETURN_IF_ERROR(compressor->ProcessBlock(true, size, input, &compressed_size,
&compressed_output));
if (LIKELY(compressed_size < size)) {
compression_scratch_.resize(compressed_size);
output_batch->tuple_data.swap(compression_scratch_);
output_batch->compression_type = THdfsCompression::LZ4;
}
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
}
return Status::OK();
}
bool RowBatch::UseFullDedup() {
// Switch to using full deduplication in cases where severe size blow-ups are known to
// be common: when a row contains tuples with collections and where there are three or
// more tuples per row so non-adjacent duplicate tuples may have been created when
// joining tuples from multiple sources into a single row.
if (row_desc_.tuple_descriptors().size() < 3) return false;
vector<TupleDescriptor*>::const_iterator tuple_desc =
row_desc_.tuple_descriptors().begin();
for (; tuple_desc != row_desc_.tuple_descriptors().end(); ++tuple_desc) {
if (!(*tuple_desc)->collection_slots().empty()) return true;
}
return false;
}
void RowBatch::SerializeInternal(int64_t size, DedupMap* distinct_tuples,
TRowBatch* output_batch) {
DCHECK(distinct_tuples == NULL || distinct_tuples->size() == 0);
// TODO: max_size() is much larger than the amount of memory we could feasibly
// allocate. Need better way to detect problem.
DCHECK_LE(size, output_batch->tuple_data.max_size());
// TODO: track memory usage
// TODO: detect if serialized size is too large to allocate and return proper error.
output_batch->tuple_data.resize(size);
output_batch->uncompressed_size = size;
output_batch->tuple_offsets.reserve(num_rows_ * num_tuples_per_row_);
// Copy tuple data of unique tuples, including strings, into output_batch (converting
// string pointers into offsets in the process).
int offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str());
for (int i = 0; i < num_rows_; ++i) {
vector<TupleDescriptor*>::const_iterator desc = row_desc_.tuple_descriptors().begin();
for (int j = 0; desc != row_desc_.tuple_descriptors().end(); ++desc, ++j) {
Tuple* tuple = GetRow(i)->GetTuple(j);
if (UNLIKELY(tuple == NULL)) {
// NULLs are encoded as -1
output_batch->tuple_offsets.push_back(-1);
continue;
} else if (LIKELY(i > 0) && UNLIKELY(GetRow(i - 1)->GetTuple(j) == tuple)) {
// Fast tuple deduplication for adjacent rows.
int prev_row_idx = output_batch->tuple_offsets.size() - num_tuples_per_row_;
output_batch->tuple_offsets.push_back(
output_batch->tuple_offsets[prev_row_idx]);
continue;
} else if (UNLIKELY(distinct_tuples != NULL)) {
if ((*desc)->byte_size() == 0) {
// Zero-length tuples can be represented as NULL.
output_batch->tuple_offsets.push_back(-1);
continue;
}
int* dedupd_offset = distinct_tuples->FindOrInsert(tuple, offset);
if (*dedupd_offset != offset) {
// Repeat of tuple
DCHECK_GE(*dedupd_offset, 0);
output_batch->tuple_offsets.push_back(*dedupd_offset);
continue;
}
}
// Record offset before creating copy (which increments offset and tuple_data)
output_batch->tuple_offsets.push_back(offset);
tuple->DeepCopy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
}
}
DCHECK_EQ(offset, size);
}
void RowBatch::AddIoBuffer(DiskIoMgr::BufferDescriptor* buffer) {
DCHECK(buffer != NULL);
io_buffers_.push_back(buffer);
auxiliary_mem_usage_ += buffer->buffer_len();
buffer->TransferOwnership(mem_tracker_);
}
void RowBatch::AddBlock(BufferedBlockMgr::Block* block, FlushMode flush) {
DCHECK(block != NULL);
DCHECK(block->is_pinned());
blocks_.push_back(block);
auxiliary_mem_usage_ += block->buffer_len();
if (flush == FlushMode::FLUSH_RESOURCES) MarkFlushResources();
}
void RowBatch::Reset() {
num_rows_ = 0;
capacity_ = tuple_ptrs_size_ / (num_tuples_per_row_ * sizeof(Tuple*));
// TODO: Change this to Clear() and investigate the repercussions.
tuple_data_pool_.FreeAll();
for (int i = 0; i < io_buffers_.size(); ++i) {
io_buffers_[i]->Return();
}
io_buffers_.clear();
for (int i = 0; i < blocks_.size(); ++i) {
blocks_[i]->Delete();
}
blocks_.clear();
auxiliary_mem_usage_ = 0;
if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
tuple_ptrs_ = reinterpret_cast<Tuple**>(tuple_data_pool_.Allocate(tuple_ptrs_size_));
}
flush_ = FlushMode::NO_FLUSH_RESOURCES;
needs_deep_copy_ = false;
}
void RowBatch::TransferResourceOwnership(RowBatch* dest) {
dest->tuple_data_pool_.AcquireData(&tuple_data_pool_, false);
for (int i = 0; i < io_buffers_.size(); ++i) {
dest->AddIoBuffer(io_buffers_[i]);
}
io_buffers_.clear();
for (int i = 0; i < blocks_.size(); ++i) {
dest->AddBlock(blocks_[i], FlushMode::NO_FLUSH_RESOURCES);
}
blocks_.clear();
if (needs_deep_copy_) {
dest->MarkNeedsDeepCopy();
} else if (flush_ == FlushMode::FLUSH_RESOURCES) {
dest->MarkFlushResources();
}
if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
// Tuple pointers were allocated from tuple_data_pool_ so are transferred.
tuple_ptrs_ = NULL;
}
Reset();
}
int RowBatch::GetBatchSize(const TRowBatch& batch) {
int result = batch.tuple_data.size();
result += batch.row_tuples.size() * sizeof(TTupleId);
result += batch.tuple_offsets.size() * sizeof(int32_t);
return result;
}
void RowBatch::AcquireState(RowBatch* src) {
DCHECK(row_desc_.Equals(src->row_desc_));
DCHECK_EQ(num_tuples_per_row_, src->num_tuples_per_row_);
DCHECK_EQ(tuple_ptrs_size_, src->tuple_ptrs_size_);
// The destination row batch should be empty.
DCHECK(!needs_deep_copy_);
DCHECK_EQ(num_rows_, 0);
DCHECK_EQ(auxiliary_mem_usage_, 0);
num_rows_ = src->num_rows_;
capacity_ = src->capacity_;
if (!FLAGS_enable_partitioned_aggregation || !FLAGS_enable_partitioned_hash_join) {
// Tuple pointers are allocated from tuple_data_pool_ so are transferred.
tuple_ptrs_ = src->tuple_ptrs_;
src->tuple_ptrs_ = NULL;
} else {
// tuple_ptrs_ were allocated with malloc so can be swapped between batches.
std::swap(tuple_ptrs_, src->tuple_ptrs_);
}
src->TransferResourceOwnership(this);
}
void RowBatch::DeepCopyTo(RowBatch* dst) {
DCHECK(dst->row_desc_.Equals(row_desc_));
DCHECK_EQ(dst->num_rows_, 0);
DCHECK_GE(dst->capacity_, num_rows_);
dst->AddRows(num_rows_);
for (int i = 0; i < num_rows_; ++i) {
TupleRow* src_row = GetRow(i);
TupleRow* dst_row = reinterpret_cast<TupleRow*>(dst->tuple_ptrs_ +
i * num_tuples_per_row_);
src_row->DeepCopy(dst_row, row_desc_.tuple_descriptors(), &dst->tuple_data_pool_,
false);
}
dst->CommitRows(num_rows_);
}
// TODO: consider computing size of batches as they are built up
int64_t RowBatch::TotalByteSize(DedupMap* distinct_tuples) {
DCHECK(distinct_tuples == NULL || distinct_tuples->size() == 0);
int64_t result = 0;
vector<int> tuple_count(row_desc_.tuple_descriptors().size(), 0);
// Sum total variable length byte sizes.
for (int i = 0; i < num_rows_; ++i) {
for (int j = 0; j < num_tuples_per_row_; ++j) {
Tuple* tuple = GetRow(i)->GetTuple(j);
if (UNLIKELY(tuple == NULL)) continue;
// Only count the data of unique tuples.
if (LIKELY(i > 0) && UNLIKELY(GetRow(i - 1)->GetTuple(j) == tuple)) {
// Fast tuple deduplication for adjacent rows.
continue;
} else if (UNLIKELY(distinct_tuples != NULL)) {
if (row_desc_.tuple_descriptors()[j]->byte_size() == 0) continue;
bool inserted = distinct_tuples->InsertIfNotPresent(tuple, -1);
if (!inserted) continue;
}
result += tuple->VarlenByteSize(*row_desc_.tuple_descriptors()[j]);
++tuple_count[j];
}
}
// Compute sum of fixed component of tuple sizes.
for (int j = 0; j < num_tuples_per_row_; ++j) {
result += row_desc_.tuple_descriptors()[j]->byte_size() * tuple_count[j];
}
return result;
}
Status RowBatch::ResizeAndAllocateTupleBuffer(RuntimeState* state,
int64_t* tuple_buffer_size, uint8_t** buffer) {
const int row_size = row_desc_.GetRowSize();
// Avoid divide-by-zero. Don't need to modify capacity for empty rows anyway.
if (row_size != 0) {
capacity_ = max(1, min(capacity_, FIXED_LEN_BUFFER_LIMIT / row_size));
}
*tuple_buffer_size = static_cast<int64_t>(row_size) * capacity_;
*buffer = tuple_data_pool_.TryAllocate(*tuple_buffer_size);
if (*buffer == NULL) {
return mem_tracker_->MemLimitExceeded(state, "Failed to allocate tuple buffer",
*tuple_buffer_size);
}
return Status::OK();
}
void RowBatch::VLogRows(const string& context) {
if (!VLOG_ROW_IS_ON) return;
VLOG_ROW << context << ": #rows=" << num_rows_;
for (int i = 0; i < num_rows_; ++i) {
VLOG_ROW << PrintRow(GetRow(i), row_desc_);
}
}
}