blob: a2071ca6cb8483084e2bc41e51278a3380acb9ba [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <iostream>
#include <sstream>
#include <boost/scoped_ptr.hpp>
#include "common/init.h"
#include "runtime/mem-tracker.h"
#include "runtime/raw-value.h"
#include "runtime/row-batch.h"
#include "runtime/string-value.h"
#include "runtime/tuple-row.h"
#include "service/fe-support.h"
#include "service/frontend.h"
#include "testutil/desc-tbl-builder.h"
#include "util/benchmark.h"
#include "util/compress.h"
#include "util/cpu-info.h"
#include "util/decompress.h"
#include "util/scope-exit-trigger.h"
#include "common/names.h"
// Benchmark to measure how quickly we can serialize and deserialize row batches. More
// specifically, this benchmark was developed to measure the overhead of deduplication.
// The benchmarks are divided into serialization and deserialization benchmarks.
// The serialization benchmarks test different serialization methods (the new default of
// adjacent deduplication vs. the baseline of no deduplication) on row batches with
// different patterns of duplication: no_dups and adjacent_dups.
// For all benchmarks we use (int, string) tuples to exercise both variable-length and
// fixed-length slot handling. The small tuples with few slots emphasizes per-tuple
// dedup performance rather than per-slot serialization/deserialization performance.
// serialize: Function Rate (iters/ms) Comparison
// ----------------------------------------------------------------------
// ser_no_dups_baseline 17.43 1X
// ser_no_dups 17.33 0.9944X
// ser_no_dups_full 14.1 0.8092X
// ser_adjacent_dups_baseline 26.65 1X
// ser_adjacent_dups 63.98 2.4X
// ser_adjacent_dups_full 55.88 2.096X
// ser_dups_baseline 19.26 1X
// ser_dups 19.55 1.015X
// ser_dups_full 32.4 1.682X
// deserialize: Function Rate (iters/ms) Comparison
// ----------------------------------------------------------------------
// deser_no_dups_baseline 64.94 1X
// deser_no_dups 69.24 1.066X
// deser_adjacent_dups_baseline 112 1X
// deser_adjacent_dups 207.4 1.852X
// deser_dups_baseline 114.8 1X
// deser_dups 208.5 1.817X
// Earlier results with LossyHashTable
// serialize: Function Rate (iters/ms) Comparison
// ----------------------------------------------------------------------
// ser_no_dups_lossy 15.93 0.9139X
// ser_adjacent_dups_lossy 58.21 2.184X
// ser_dups_lossy 50.46 2.62X
// Earlier results with boost::unordered_map
// serialize: Function Rate (iters/ms) Comparison
// ----------------------------------------------------------------------
// ser_no_dups_full 8.73 0.5582X
// ser_adjacent_dups_full 38.7 1.634X
// ser_dups_full 27.5 1.54X
using namespace impala;
const int NUM_ROWS = 1024;
const int MAX_STRING_LEN = 10;
namespace impala {
// For computing tuple mem layouts.
static scoped_ptr<Frontend> fe;
// Friend class with access to RowBatch internals
class RowBatchSerializeBaseline {
// Copy of baseline version without dedup logic
static int Serialize(RowBatch* batch, TRowBatch* output_batch) {
output_batch->compression_type = THdfsCompression::NONE;
output_batch->num_rows = batch->num_rows_;
output_batch->tuple_offsets.reserve(batch->num_rows_ * batch->num_tuples_per_row_);
int64_t size = TotalByteSize(batch);
SerializeInternal(batch, size, output_batch);
if (size > 0) {
// Try compressing tuple_data to compression_scratch_, swap if compressed data is
// smaller
Lz4Compressor compressor(nullptr, false);
Status status = compressor.Init();
DCHECK(status.ok()) << status.GetDetail();
auto compressor_cleanup =
MakeScopeExitTrigger([&compressor]() { compressor.Close(); });
int64_t compressed_size = compressor.MaxOutputLen(size);
DCHECK_GT(compressed_size, 0);
if (batch->compression_scratch_.size() < compressed_size) {
uint8_t* input = (uint8_t*)output_batch->tuple_data.c_str();
uint8_t* compressed_output = (uint8_t*)batch->compression_scratch_.c_str();
status =
compressor.ProcessBlock(true, size, input, &compressed_size, &compressed_output);
DCHECK(status.ok()) << status.GetDetail();
if (LIKELY(compressed_size < size)) {
output_batch->compression_type = THdfsCompression::LZ4;
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
return RowBatch::GetDeserializedSize(*output_batch);
// Copy of baseline version without dedup logic
static void SerializeInternal(RowBatch* batch, int64_t size, TRowBatch* output_batch) {
DCHECK_LE(size, output_batch->tuple_data.max_size());
output_batch->uncompressed_size = size;
// Copy tuple data of unique tuples, including strings, into output_batch (converting
// string pointers into offsets in the process).
int offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = const_cast<char*>(output_batch->tuple_data.c_str());
for (int i = 0; i < batch->num_rows_; ++i) {
vector<TupleDescriptor*>::const_iterator desc =
for (int j = 0; desc != batch->row_desc_->tuple_descriptors().end(); ++desc, ++j) {
Tuple* tuple = batch->GetRow(i)->GetTuple(j);
if (tuple == NULL) {
// NULLs are encoded as -1
// Record offset before creating copy (which increments offset and tuple_data)
tuple->DeepCopy(**desc, &tuple_data, &offset, /* convert_ptrs */ true);
DCHECK_LE(offset, size);
DCHECK_EQ(offset, size);
// Copy of baseline version without dedup logic
static int64_t TotalByteSize(RowBatch* batch) {
int64_t result = 0;
for (int i = 0; i < batch->num_rows_; ++i) {
for (int j = 0; j < batch->num_tuples_per_row_; ++j) {
Tuple* tuple = batch->GetRow(i)->GetTuple(j);
if (tuple == NULL) continue;
result += tuple->TotalByteSize(*batch->row_desc_->tuple_descriptors()[j]);
return result;
// Copy of baseline version without dedup logic
static void Deserialize(RowBatch* batch, const TRowBatch& input_batch) {
batch->num_rows_ = input_batch.num_rows;
batch->capacity_ = batch->num_rows_;
uint8_t* tuple_data;
if (input_batch.compression_type != THdfsCompression::NONE) {
// Decompress tuple data into data pool
uint8_t* compressed_data = (uint8_t*)input_batch.tuple_data.c_str();
size_t compressed_size = input_batch.tuple_data.size();
Lz4Decompressor decompressor(nullptr, false);
Status status = decompressor.Init();
DCHECK(status.ok()) << status.GetDetail();
auto compressor_cleanup =
MakeScopeExitTrigger([&decompressor]() { decompressor.Close(); });
int64_t uncompressed_size = input_batch.uncompressed_size;
DCHECK_NE(uncompressed_size, -1) << "RowBatch decompression failed";
tuple_data = batch->tuple_data_pool()->Allocate(uncompressed_size);
status = decompressor.ProcessBlock(
true, compressed_size, compressed_data, &uncompressed_size, &tuple_data);
DCHECK(status.ok()) << "RowBatch decompression failed.";
} else {
// Tuple data uncompressed, copy directly into data pool
tuple_data = batch->tuple_data_pool()->Allocate(input_batch.tuple_data.size());
memcpy(tuple_data, input_batch.tuple_data.c_str(), input_batch.tuple_data.size());
// Convert input_batch.tuple_offsets into pointers
int tuple_idx = 0;
for (vector<int32_t>::const_iterator offset = input_batch.tuple_offsets.begin();
offset != input_batch.tuple_offsets.end(); ++offset) {///
if (*offset == -1) {
batch->tuple_ptrs_[tuple_idx++] = NULL;
} else {
batch->tuple_ptrs_[tuple_idx++] = reinterpret_cast<Tuple*>(tuple_data + *offset);
// Check whether we have slots that require offset-to-pointer conversion.
if (!batch->row_desc_->HasVarlenSlots()) return;
for (int i = 0; i < batch->num_rows_; ++i) {
for (int j = 0; j < batch->num_tuples_per_row_; ++j) {
const TupleDescriptor* desc = batch->row_desc_->tuple_descriptors()[j];
if (!desc->HasVarlenSlots()) continue;
Tuple* tuple = batch->GetRow(i)->GetTuple(j);
if (tuple == NULL) continue;
tuple->ConvertOffsetsToPointers(*desc, tuple_data);
class RowBatchSerializeBenchmark {
// Fill batch with (int, string) tuples with random data.
static void FillBatch(RowBatch* batch, int rand_seed, int repeats, int cycle) {
if (cycle <= 0) cycle = NUM_ROWS; // Negative means no repeats in cycle.
MemPool* mem_pool = batch->tuple_data_pool();
const TupleDescriptor* tuple_desc = batch->row_desc()->tuple_descriptors()[0];
int unique_tuples = (NUM_ROWS - 1) / repeats + 1;
uint8_t* tuple_mem = mem_pool->Allocate(tuple_desc->byte_size() * unique_tuples);
for (int i = 0; i < NUM_ROWS; ++i) {
int row_idx = batch->AddRow();
TupleRow* row = batch->GetRow(row_idx);
Tuple* tuple;
if (i >= cycle) {
// Duplicate of tuple from previous cycle.
tuple = batch->GetRow(i - cycle)->GetTuple(0);
} else if (i % repeats == 0) {
// Generate new unique tuple.
tuple = reinterpret_cast<Tuple*>(tuple_mem);
tuple_mem += tuple_desc->byte_size();
int int_val = rand();
RawValue::Write(&int_val, tuple, tuple_desc->slots()[0], mem_pool);
char string_buf[MAX_STRING_LEN + 1];
int string_len = rand() % MAX_STRING_LEN;
for (int j = 0; j < string_len; ++j) {
string_buf[j] = (char)rand() % 256;
StringValue string_val(string_buf, string_len);
RawValue::Write(&string_val, tuple, tuple_desc->slots()[1], mem_pool);
} else {
// Duplicate of previous.
tuple = batch->GetRow(i - 1)->GetTuple(0);
row->SetTuple(0, tuple);
struct SerializeArgs {
RowBatch* batch;
bool full_dedup;
static void TestSerialize(int batch_size, void* data) {
SerializeArgs* args = reinterpret_cast<SerializeArgs*>(data);
for (int iter = 0; iter < batch_size; ++iter) {
TRowBatch trow_batch;
ABORT_IF_ERROR(args->batch->Serialize(&trow_batch, args->full_dedup));
static void TestSerializeBaseline(int batch_size, void* data) {
RowBatch* batch = reinterpret_cast<RowBatch*>(data);
for (int iter = 0; iter < batch_size; ++iter) {
TRowBatch trow_batch;
RowBatchSerializeBaseline::Serialize(batch, &trow_batch);
struct DeserializeArgs {
TRowBatch* trow_batch;
RowDescriptor* row_desc;
MemTracker* tracker;
static void TestDeserialize(int batch_size, void* data) {
struct DeserializeArgs* args = reinterpret_cast<struct DeserializeArgs*>(data);
for (int iter = 0; iter < batch_size; ++iter) {
RowBatch deserialized_batch(args->row_desc, *args->trow_batch, args->tracker);
static void TestDeserializeBaseline(int batch_size, void* data) {
struct DeserializeArgs* args = reinterpret_cast<struct DeserializeArgs*>(data);
for (int iter = 0; iter < batch_size; ++iter) {
RowBatch deserialized_batch(
args->row_desc, args->trow_batch->num_rows, args->tracker);
RowBatchSerializeBaseline::Deserialize(&deserialized_batch, *args->trow_batch);
static void Run() {
MemTracker tracker;
MemPool mem_pool(&tracker);
ObjectPool obj_pool;
DescriptorTblBuilder builder(fe.get(), &obj_pool);
builder.DeclareTuple() << TYPE_INT << TYPE_STRING;
DescriptorTbl* desc_tbl = builder.Build();
vector<bool> nullable_tuples(1, false);
vector<TTupleId> tuple_id(1, (TTupleId) 0);
RowDescriptor row_desc(*desc_tbl, tuple_id, nullable_tuples);
RowBatch* no_dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker));
FillBatch(no_dup_batch, 12345, 1, -1);
TRowBatch no_dup_tbatch;
RowBatch* adjacent_dup_batch =
obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker));
FillBatch(adjacent_dup_batch, 12345, 5, -1);
TRowBatch adjacent_dup_tbatch;
ABORT_IF_ERROR(adjacent_dup_batch->Serialize(&adjacent_dup_tbatch, false));
RowBatch* dup_batch = obj_pool.Add(new RowBatch(&row_desc, NUM_ROWS, &tracker));
// Non-adjacent duplicates.
FillBatch(dup_batch, 12345, 1, NUM_ROWS / 5);
TRowBatch dup_tbatch;
ABORT_IF_ERROR(dup_batch->Serialize(&dup_tbatch, true));
int baseline;
Benchmark ser_suite("serialize");
baseline = ser_suite.AddBenchmark("ser_no_dups_baseline", TestSerializeBaseline,
no_dup_batch, -1);
struct SerializeArgs no_dup_ser_args = { no_dup_batch, false };
struct SerializeArgs no_dup_ser_full_args = { no_dup_batch, true };
ser_suite.AddBenchmark("ser_no_dups", TestSerialize, &no_dup_ser_args, baseline);
TestSerialize, &no_dup_ser_full_args, baseline);
baseline = ser_suite.AddBenchmark("ser_adjacent_dups_baseline",
TestSerializeBaseline, adjacent_dup_batch, -1);
struct SerializeArgs adjacent_dup_ser_args = { adjacent_dup_batch, false };
struct SerializeArgs adjacent_dup_ser_full_args = { adjacent_dup_batch, true };
TestSerialize, &adjacent_dup_ser_args, baseline);
TestSerialize, &adjacent_dup_ser_full_args, baseline);
baseline = ser_suite.AddBenchmark("ser_dups_baseline",
TestSerializeBaseline, dup_batch, -1);
struct SerializeArgs dup_ser_args = { dup_batch, false };
struct SerializeArgs dup_ser_full_args = { dup_batch, true };
ser_suite.AddBenchmark("ser_dups", TestSerialize, &dup_ser_args, baseline);
ser_suite.AddBenchmark("ser_dups_full", TestSerialize, &dup_ser_full_args, baseline);
cout << ser_suite.Measure() << endl;
Benchmark deser_suite("deserialize");
struct DeserializeArgs no_dup_deser_args = { &no_dup_tbatch, &row_desc, &tracker };
baseline = deser_suite.AddBenchmark("deser_no_dups_baseline",
TestDeserializeBaseline, &no_dup_deser_args, -1);
TestDeserialize, &no_dup_deser_args, baseline);
struct DeserializeArgs adjacent_dup_deser_args = { &adjacent_dup_tbatch, &row_desc,
&tracker };
baseline = deser_suite.AddBenchmark("deser_adjacent_dups_baseline",
TestDeserializeBaseline, &adjacent_dup_deser_args, -1);
TestDeserialize, &adjacent_dup_deser_args, baseline);
struct DeserializeArgs dup_deser_args = { &dup_tbatch, &row_desc, &tracker };
baseline = deser_suite.AddBenchmark("deser_dups_baseline",
TestDeserializeBaseline, &dup_deser_args, -1);
deser_suite.AddBenchmark("deser_dups", TestDeserialize, &dup_deser_args, baseline);
cout << deser_suite.Measure() << endl;
int main(int argc, char** argv) {
impala::InitCommonRuntime(argc, argv, true);
fe.reset(new Frontend());
return 0;