blob: 12b5124a430485dc67172b40f3771ee4fc646fdb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <gtest/gtest.h>
#include <boost/bind.hpp>
#include <boost/filesystem.hpp>
#include <boost/scoped_ptr.hpp>
#include <limits> // for std::numeric_limits<int>::max()
#include <string>
#include "gen_cpp/Types_types.h"
#include "runtime/buffered_tuple_stream2.inline.h"
#include "runtime/row_batch.h"
#include "runtime/string_value.hpp"
#include "runtime/test_env.h"
#include "runtime/tmp_file_mgr.h"
#include "runtime/types.h"
#include "testutil/desc_tbl_builder.h"
#include "util/cpu_info.h"
#include "util/debug_util.h"
#include "util/disk_info.h"
#include "util/logging.h"
using std::vector;
using boost::scoped_ptr;
static const int BATCH_SIZE = 250;
static const uint32_t PRIME = 479001599;
namespace doris {
static const StringValue STRINGS[] = {
StringValue("ABC"),
StringValue("HELLO"),
StringValue("123456789"),
StringValue("FOOBAR"),
StringValue("ONE"),
StringValue("THREE"),
StringValue("abcdefghijklmno"),
StringValue("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"),
StringValue("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"),
};
static const int NUM_STRINGS = sizeof(STRINGS) / sizeof(StringValue);
class SimpleTupleStreamTest : public testing::Test {
public:
SimpleTupleStreamTest() : _tracker(new MemTracker(-1)) {}
// A null dtor to pass codestyle check
~SimpleTupleStreamTest() {}
protected:
virtual void SetUp() {
_test_env.reset(new TestEnv());
create_descriptors();
_mem_pool.reset(new MemPool(_tracker.get()));
}
virtual void create_descriptors() {
std::vector<bool> nullable_tuples(1, false);
std::vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
DescriptorTblBuilder int_builder(&_pool);
int_builder.declare_tuple() << TYPE_INT;
_int_desc = _pool.add(new RowDescriptor(*int_builder.build(), tuple_ids, nullable_tuples));
DescriptorTblBuilder string_builder(&_pool);
// string_builder.declare_tuple() << TYPE_STRING;
string_builder.declare_tuple() << TYPE_VARCHAR;
_string_desc =
_pool.add(new RowDescriptor(*string_builder.build(), tuple_ids, nullable_tuples));
}
virtual void TearDown() {
_runtime_state = NULL;
_client = NULL;
_pool.clear();
_mem_pool->free_all();
_test_env.reset();
}
// Setup a block manager with the provided settings and client with no reservation,
// tracked by _tracker.
void InitBlockMgr(int64_t limit, int block_size) {
Status status = _test_env->create_query_state(0, limit, block_size, &_runtime_state);
ASSERT_TRUE(status.ok());
status = _runtime_state->block_mgr2()->register_client(0, _tracker, _runtime_state,
&_client);
ASSERT_TRUE(status.ok());
}
// Generate the ith element of a sequence of int values.
int GenIntValue(int i) {
// Multiply by large prime to get varied bit patterns.
return i * PRIME;
}
// Generate the ith element of a sequence of bool values.
bool GenBoolValue(int i) {
// Use a middle bit of the int value.
return ((GenIntValue(i) >> 8) & 0x1) != 0;
}
virtual RowBatch* CreateIntBatch(int offset, int num_rows, bool gen_null) {
RowBatch* batch = _pool.add(new RowBatch(*_int_desc, num_rows, _tracker.get()));
int tuple_size = _int_desc->tuple_descriptors()[0]->byte_size();
uint8_t* tuple_mem = reinterpret_cast<uint8_t*>(
batch->tuple_data_pool()->allocate(tuple_size * num_rows));
memset(tuple_mem, 0, tuple_size * num_rows);
const int int_tuples = _int_desc->tuple_descriptors().size();
for (int i = 0; i < num_rows; ++i) {
int idx = batch->add_row();
TupleRow* row = batch->get_row(idx);
Tuple* int_tuple = reinterpret_cast<Tuple*>(tuple_mem + i * tuple_size);
// *reinterpret_cast<int*>(int_tuple + 1) = GenIntValue(i + offset);
*reinterpret_cast<int*>(reinterpret_cast<uint8_t*>(int_tuple) + 1) =
GenIntValue(i + offset);
for (int j = 0; j < int_tuples; ++j) {
int idx = (i + offset) * int_tuples + j;
if (!gen_null || GenBoolValue(idx)) {
row->set_tuple(j, int_tuple);
} else {
row->set_tuple(j, NULL);
}
}
batch->commit_last_row();
}
return batch;
}
virtual RowBatch* CreateStringBatch(int offset, int num_rows, bool gen_null) {
int tuple_size = sizeof(StringValue) + 1;
RowBatch* batch = _pool.add(new RowBatch(*_string_desc, num_rows, _tracker.get()));
uint8_t* tuple_mem = batch->tuple_data_pool()->allocate(tuple_size * num_rows);
memset(tuple_mem, 0, tuple_size * num_rows);
const int string_tuples = _string_desc->tuple_descriptors().size();
for (int i = 0; i < num_rows; ++i) {
TupleRow* row = batch->get_row(batch->add_row());
*reinterpret_cast<StringValue*>(tuple_mem + 1) = STRINGS[(i + offset) % NUM_STRINGS];
for (int j = 0; j < string_tuples; ++j) {
int idx = (i + offset) * string_tuples + j;
if (!gen_null || GenBoolValue(idx)) {
row->set_tuple(j, reinterpret_cast<Tuple*>(tuple_mem));
} else {
row->set_tuple(j, NULL);
}
}
batch->commit_last_row();
tuple_mem += tuple_size;
}
return batch;
}
void AppendRowTuples(TupleRow* row, std::vector<int>* results) {
DCHECK(row != NULL);
const int int_tuples = _int_desc->tuple_descriptors().size();
for (int i = 0; i < int_tuples; ++i) {
AppendValue(row->get_tuple(i), results);
}
}
void AppendRowTuples(TupleRow* row, std::vector<StringValue>* results) {
DCHECK(row != NULL);
const int string_tuples = _string_desc->tuple_descriptors().size();
for (int i = 0; i < string_tuples; ++i) {
AppendValue(row->get_tuple(i), results);
}
}
void AppendValue(Tuple* t, std::vector<int>* results) {
if (t == NULL) {
// For the tests indicate null-ability using the max int value
results->push_back(std::numeric_limits<int>::max());
} else {
results->push_back(*reinterpret_cast<int*>(reinterpret_cast<uint8_t*>(t) + 1));
}
}
void AppendValue(Tuple* t, std::vector<StringValue>* results) {
if (t == NULL) {
results->push_back(StringValue());
} else {
uint8_t* mem = reinterpret_cast<uint8_t*>(t);
StringValue sv = *reinterpret_cast<StringValue*>(mem + 1);
uint8_t* copy = _mem_pool->allocate(sv.len);
memcpy(copy, sv.ptr, sv.len);
sv.ptr = reinterpret_cast<char*>(copy);
results->push_back(sv);
}
}
template <typename T>
void ReadValues(BufferedTupleStream2* stream, RowDescriptor* desc, std::vector<T>* results,
int num_batches = -1) {
bool eos = false;
RowBatch batch(*desc, BATCH_SIZE, _tracker.get());
int batches_read = 0;
do {
batch.reset();
Status status = stream->get_next(&batch, &eos);
EXPECT_TRUE(status.ok());
++batches_read;
for (int i = 0; i < batch.num_rows(); ++i) {
AppendRowTuples(batch.get_row(i), results);
}
} while (!eos && (num_batches < 0 || batches_read <= num_batches));
}
virtual void VerifyResults(const std::vector<int>& results, int exp_rows, bool gen_null) {
const int int_tuples = _int_desc->tuple_descriptors().size();
EXPECT_EQ(results.size(), exp_rows * int_tuples);
for (int i = 0; i < exp_rows; ++i) {
for (int j = 0; j < int_tuples; ++j) {
int idx = i * int_tuples + j;
if (!gen_null || GenBoolValue(idx)) {
ASSERT_EQ(results[idx], GenIntValue(i))
<< " results[" << idx << "]: " << results[idx]
<< " != " << GenIntValue(i) << " gen_null=" << gen_null;
} else {
ASSERT_TRUE(results[idx] == std::numeric_limits<int>::max())
<< "i: " << i << " j: " << j << " results[" << idx
<< "]: " << results[idx] << " != " << std::numeric_limits<int>::max();
}
}
}
}
virtual void VerifyResults(const std::vector<StringValue>& results, int exp_rows,
bool gen_null) {
const int string_tuples = _string_desc->tuple_descriptors().size();
EXPECT_EQ(results.size(), exp_rows * string_tuples);
for (int i = 0; i < exp_rows; ++i) {
for (int j = 0; j < string_tuples; ++j) {
int idx = i * string_tuples + j;
if (!gen_null || GenBoolValue(idx)) {
ASSERT_TRUE(results[idx] == STRINGS[i % NUM_STRINGS])
<< "results[" << idx << "] " << results[idx]
<< " != " << STRINGS[i % NUM_STRINGS] << " i=" << i
<< " gen_null=" << gen_null;
} else {
ASSERT_TRUE(results[idx] == StringValue())
<< "results[" << idx << "] " << results[idx] << " not NULL";
}
}
}
}
// Test adding num_batches of ints to the stream and reading them back.
template <typename T>
void TestValues(int num_batches, RowDescriptor* desc, bool gen_null) {
BufferedTupleStream2 stream(_runtime_state, *desc, _runtime_state->block_mgr2(), _client,
true, false);
Status status = stream.init(-1, NULL, true);
ASSERT_TRUE(status.ok()) << status.get_error_msg();
status = stream.unpin_stream();
ASSERT_TRUE(status.ok());
// Add rows to the stream
int offset = 0;
for (int i = 0; i < num_batches; ++i) {
RowBatch* batch = NULL;
if (sizeof(T) == sizeof(int)) {
batch = CreateIntBatch(offset, BATCH_SIZE, gen_null);
} else if (sizeof(T) == sizeof(StringValue)) {
batch = CreateStringBatch(offset, BATCH_SIZE, gen_null);
} else {
DCHECK(false);
}
for (int j = 0; j < batch->num_rows(); ++j) {
bool b = stream.add_row(batch->get_row(j), &status);
ASSERT_TRUE(status.ok());
if (!b) {
ASSERT_TRUE(stream.using_small_buffers());
bool got_buffer;
status = stream.switch_to_io_buffers(&got_buffer);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(got_buffer);
b = stream.add_row(batch->get_row(j), &status);
ASSERT_TRUE(status.ok());
}
ASSERT_TRUE(b);
}
offset += batch->num_rows();
// Reset the batch to make sure the stream handles the memory correctly.
batch->reset();
}
status = stream.prepare_for_read(false);
ASSERT_TRUE(status.ok());
// Read all the rows back
std::vector<T> results;
ReadValues(&stream, desc, &results);
// Verify result
VerifyResults(results, BATCH_SIZE * num_batches, gen_null);
stream.close();
}
void TestIntValuesInterleaved(int num_batches, int num_batches_before_read) {
for (int small_buffers = 0; small_buffers < 2; ++small_buffers) {
BufferedTupleStream2 stream(_runtime_state, *_int_desc, _runtime_state->block_mgr2(),
_client, small_buffers == 0, // initial small buffers
true); // read_write
Status status = stream.init(-1, NULL, true);
ASSERT_TRUE(status.ok());
status = stream.prepare_for_read(true);
ASSERT_TRUE(status.ok());
status = stream.unpin_stream();
ASSERT_TRUE(status.ok());
std::vector<int> results;
for (int i = 0; i < num_batches; ++i) {
RowBatch* batch = CreateIntBatch(i * BATCH_SIZE, BATCH_SIZE, false);
for (int j = 0; j < batch->num_rows(); ++j) {
bool b = stream.add_row(batch->get_row(j), &status);
ASSERT_TRUE(b);
ASSERT_TRUE(status.ok());
}
// Reset the batch to make sure the stream handles the memory correctly.
batch->reset();
if (i % num_batches_before_read == 0) {
ReadValues(&stream, _int_desc, &results,
(rand() % num_batches_before_read) + 1);
}
}
ReadValues(&stream, _int_desc, &results);
VerifyResults(results, BATCH_SIZE * num_batches, false);
stream.close();
}
}
boost::scoped_ptr<TestEnv> _test_env;
RuntimeState* _runtime_state;
BufferedBlockMgr2::Client* _client;
std::shared_ptr<MemTracker> _tracker;
ObjectPool _pool;
RowDescriptor* _int_desc;
RowDescriptor* _string_desc;
boost::scoped_ptr<MemPool> _mem_pool;
};
// Tests with a non-NULLable tuple per row.
class SimpleNullStreamTest : public SimpleTupleStreamTest {
protected:
virtual void create_descriptors() {
std::vector<bool> nullable_tuples(1, true);
std::vector<TTupleId> tuple_ids(1, static_cast<TTupleId>(0));
DescriptorTblBuilder int_builder(&_pool);
int_builder.declare_tuple() << TYPE_INT;
_int_desc = _pool.add(new RowDescriptor(*int_builder.build(), tuple_ids, nullable_tuples));
DescriptorTblBuilder string_builder(&_pool);
string_builder.declare_tuple() << TYPE_VARCHAR;
_string_desc =
_pool.add(new RowDescriptor(*string_builder.build(), tuple_ids, nullable_tuples));
}
}; // SimpleNullStreamTest
// Tests with multiple non-NULLable tuples per row.
class MultiTupleStreamTest : public SimpleTupleStreamTest {
protected:
virtual void create_descriptors() {
std::vector<bool> nullable_tuples;
nullable_tuples.push_back(false);
nullable_tuples.push_back(false);
nullable_tuples.push_back(false);
std::vector<TTupleId> tuple_ids;
tuple_ids.push_back(static_cast<TTupleId>(0));
tuple_ids.push_back(static_cast<TTupleId>(1));
tuple_ids.push_back(static_cast<TTupleId>(2));
DescriptorTblBuilder int_builder(&_pool);
int_builder.declare_tuple() << TYPE_INT;
int_builder.declare_tuple() << TYPE_INT;
int_builder.declare_tuple() << TYPE_INT;
_int_desc = _pool.add(new RowDescriptor(*int_builder.build(), tuple_ids, nullable_tuples));
DescriptorTblBuilder string_builder(&_pool);
string_builder.declare_tuple() << TYPE_VARCHAR;
string_builder.declare_tuple() << TYPE_VARCHAR;
string_builder.declare_tuple() << TYPE_VARCHAR;
_string_desc =
_pool.add(new RowDescriptor(*string_builder.build(), tuple_ids, nullable_tuples));
}
};
// Tests with multiple NULLable tuples per row.
class MultiNullableTupleStreamTest : public SimpleTupleStreamTest {
protected:
virtual void create_descriptors() {
std::vector<bool> nullable_tuples;
nullable_tuples.push_back(false);
nullable_tuples.push_back(true);
nullable_tuples.push_back(true);
std::vector<TTupleId> tuple_ids;
tuple_ids.push_back(static_cast<TTupleId>(0));
tuple_ids.push_back(static_cast<TTupleId>(1));
tuple_ids.push_back(static_cast<TTupleId>(2));
DescriptorTblBuilder int_builder(&_pool);
int_builder.declare_tuple() << TYPE_INT;
int_builder.declare_tuple() << TYPE_INT;
int_builder.declare_tuple() << TYPE_INT;
_int_desc = _pool.add(new RowDescriptor(*int_builder.build(), tuple_ids, nullable_tuples));
DescriptorTblBuilder string_builder(&_pool);
string_builder.declare_tuple() << TYPE_VARCHAR;
string_builder.declare_tuple() << TYPE_VARCHAR;
string_builder.declare_tuple() << TYPE_VARCHAR;
_string_desc =
_pool.add(new RowDescriptor(*string_builder.build(), tuple_ids, nullable_tuples));
}
};
#if 0
// Tests with collection types.
class ArrayTupleStreamTest : public SimpleTupleStreamTest {
protected:
RowDescriptor* _array_desc;
virtual void create_descriptors() {
// tuples: (array<string>, array<array<int>>) (array<int>)
std::vector<bool> nullable_tuples(2, true);
std::vector<TTupleId> tuple_ids;
tuple_ids.push_back(static_cast<TTupleId>(0));
tuple_ids.push_back(static_cast<TTupleId>(1));
TypeDescriptor string_array_type;
string_array_type.type = TYPE_ARRAY;
string_array_type.children.push_back(TYPE_VARCHAR);
TypeDescriptor int_array_type;
int_array_type.type = TYPE_ARRAY;
int_array_type.children.push_back(TYPE_VARCHAR);
TypeDescriptor nested_array_type;
nested_array_type.type = TYPE_ARRAY;
nested_array_type.children.push_back(int_array_type);
DescriptorTblBuilder builder(&_pool);
builder.declare_tuple() << string_array_type << nested_array_type;
builder.declare_tuple() << int_array_type;
_array_desc = _pool.add(new RowDescriptor(
*builder.build(), tuple_ids, nullable_tuples));
}
};
#endif
// Basic API test. No data should be going to disk.
TEST_F(SimpleTupleStreamTest, Basic) {
InitBlockMgr(-1, 8 * 1024 * 1024);
TestValues<int>(1, _int_desc, false);
TestValues<int>(10, _int_desc, false);
TestValues<int>(100, _int_desc, false);
TestValues<StringValue>(1, _string_desc, false);
TestValues<StringValue>(10, _string_desc, false);
TestValues<StringValue>(100, _string_desc, false);
TestIntValuesInterleaved(1, 1);
TestIntValuesInterleaved(10, 5);
TestIntValuesInterleaved(100, 15);
}
// #if 0
// Test with only 1 buffer.
TEST_F(SimpleTupleStreamTest, OneBufferSpill) {
// Each buffer can only hold 100 ints, so this spills quite often.
int buffer_size = 100 * sizeof(int);
InitBlockMgr(buffer_size, buffer_size);
TestValues<int>(1, _int_desc, false);
TestValues<int>(10, _int_desc, false);
TestValues<StringValue>(1, _string_desc, false);
TestValues<StringValue>(10, _string_desc, false);
}
// Test with a few buffers.
TEST_F(SimpleTupleStreamTest, ManyBufferSpill) {
int buffer_size = 100 * sizeof(int);
InitBlockMgr(10 * buffer_size, buffer_size);
TestValues<int>(1, _int_desc, false);
TestValues<int>(10, _int_desc, false);
TestValues<int>(100, _int_desc, false);
TestValues<StringValue>(1, _string_desc, false);
TestValues<StringValue>(10, _string_desc, false);
TestValues<StringValue>(100, _string_desc, false);
TestIntValuesInterleaved(1, 1);
TestIntValuesInterleaved(10, 5);
TestIntValuesInterleaved(100, 15);
}
TEST_F(SimpleTupleStreamTest, UnpinPin) {
int buffer_size = 100 * sizeof(int);
InitBlockMgr(3 * buffer_size, buffer_size);
BufferedTupleStream2 stream(_runtime_state, *_int_desc, _runtime_state->block_mgr2(), _client,
true, false);
Status status = stream.init(-1, NULL, true);
ASSERT_TRUE(status.ok());
int offset = 0;
bool full = false;
while (!full) {
RowBatch* batch = CreateIntBatch(offset, BATCH_SIZE, false);
int j = 0;
for (; j < batch->num_rows(); ++j) {
full = !stream.add_row(batch->get_row(j), &status);
ASSERT_TRUE(status.ok());
if (full) {
break;
}
}
offset += j;
}
status = stream.unpin_stream();
ASSERT_TRUE(status.ok());
bool pinned = false;
status = stream.pin_stream(false, &pinned);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(pinned);
std::vector<int> results;
// Read and verify result a few times. We should be able to reread the stream if
// we don't use delete on read mode.
int read_iters = 3;
for (int i = 0; i < read_iters; ++i) {
bool delete_on_read = i == read_iters - 1;
status = stream.prepare_for_read(delete_on_read);
ASSERT_TRUE(status.ok());
results.clear();
ReadValues(&stream, _int_desc, &results);
VerifyResults(results, offset, false);
}
// After delete_on_read, all blocks aside from the last should be deleted.
// Note: this should really be 0, but the BufferedTupleStream2 returns eos before
// deleting the last block, rather than after, so the last block isn't deleted
// until the stream is closed.
DCHECK_EQ(stream.bytes_in_mem(false), buffer_size);
stream.close();
DCHECK_EQ(stream.bytes_in_mem(false), 0);
}
TEST_F(SimpleTupleStreamTest, SmallBuffers) {
int buffer_size = 8 * 1024 * 1024;
InitBlockMgr(2 * buffer_size, buffer_size);
BufferedTupleStream2 stream(_runtime_state, *_int_desc, _runtime_state->block_mgr2(), _client,
true, false);
Status status = stream.init(-1, NULL, false);
ASSERT_TRUE(status.ok());
// Initial buffer should be small.
EXPECT_LT(stream.bytes_in_mem(false), buffer_size);
RowBatch* batch = CreateIntBatch(0, 1024, false);
for (int i = 0; i < batch->num_rows(); ++i) {
bool ret = stream.add_row(batch->get_row(i), &status);
EXPECT_TRUE(ret);
ASSERT_TRUE(status.ok());
}
EXPECT_LT(stream.bytes_in_mem(false), buffer_size);
EXPECT_LT(stream.byte_size(), buffer_size);
ASSERT_TRUE(stream.using_small_buffers());
// 40 MB of ints
batch = CreateIntBatch(0, 10 * 1024 * 1024, false);
for (int i = 0; i < batch->num_rows(); ++i) {
bool ret = stream.add_row(batch->get_row(i), &status);
ASSERT_TRUE(status.ok());
if (!ret) {
ASSERT_TRUE(stream.using_small_buffers());
bool got_buffer;
status = stream.switch_to_io_buffers(&got_buffer);
ASSERT_TRUE(status.ok());
ASSERT_TRUE(got_buffer);
ret = stream.add_row(batch->get_row(i), &status);
ASSERT_TRUE(status.ok());
}
ASSERT_TRUE(ret);
}
EXPECT_EQ(stream.bytes_in_mem(false), buffer_size);
// TODO: Test for IMPALA-2330. In case switch_to_io_buffers() fails to get buffer then
// using_small_buffers() should still return true.
stream.close();
}
// Basic API test. No data should be going to disk.
TEST_F(SimpleNullStreamTest, Basic) {
InitBlockMgr(-1, 8 * 1024 * 1024);
TestValues<int>(1, _int_desc, false);
TestValues<int>(10, _int_desc, false);
TestValues<int>(100, _int_desc, false);
TestValues<int>(1, _int_desc, true);
TestValues<int>(10, _int_desc, true);
TestValues<int>(100, _int_desc, true);
TestValues<StringValue>(1, _string_desc, false);
TestValues<StringValue>(10, _string_desc, false);
TestValues<StringValue>(100, _string_desc, false);
TestValues<StringValue>(1, _string_desc, true);
TestValues<StringValue>(10, _string_desc, true);
TestValues<StringValue>(100, _string_desc, true);
TestIntValuesInterleaved(1, 1);
TestIntValuesInterleaved(10, 5);
TestIntValuesInterleaved(100, 15);
}
// Test tuple stream with only 1 buffer and rows with multiple tuples.
TEST_F(MultiTupleStreamTest, MultiTupleOneBufferSpill) {
// Each buffer can only hold 100 ints, so this spills quite often.
int buffer_size = 100 * sizeof(int);
InitBlockMgr(buffer_size, buffer_size);
TestValues<int>(1, _int_desc, false);
TestValues<int>(10, _int_desc, false);
TestValues<StringValue>(1, _string_desc, false);
TestValues<StringValue>(10, _string_desc, false);
}
// Test with a few buffers and rows with multiple tuples.
TEST_F(MultiTupleStreamTest, MultiTupleManyBufferSpill) {
int buffer_size = 100 * sizeof(int);
InitBlockMgr(10 * buffer_size, buffer_size);
TestValues<int>(1, _int_desc, false);
TestValues<int>(10, _int_desc, false);
TestValues<int>(100, _int_desc, false);
TestValues<StringValue>(1, _string_desc, false);
TestValues<StringValue>(10, _string_desc, false);
TestValues<StringValue>(100, _string_desc, false);
TestIntValuesInterleaved(1, 1);
TestIntValuesInterleaved(10, 5);
TestIntValuesInterleaved(100, 15);
}
// Test with rows with multiple nullable tuples.
TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleOneBufferSpill) {
// Each buffer can only hold 100 ints, so this spills quite often.
int buffer_size = 100 * sizeof(int);
InitBlockMgr(buffer_size, buffer_size);
TestValues<int>(1, _int_desc, false);
TestValues<int>(10, _int_desc, false);
TestValues<int>(1, _int_desc, true);
TestValues<int>(10, _int_desc, true);
TestValues<StringValue>(1, _string_desc, false);
TestValues<StringValue>(10, _string_desc, false);
TestValues<StringValue>(1, _string_desc, true);
TestValues<StringValue>(10, _string_desc, true);
}
// Test with a few buffers.
TEST_F(MultiNullableTupleStreamTest, MultiNullableTupleManyBufferSpill) {
int buffer_size = 100 * sizeof(int);
InitBlockMgr(10 * buffer_size, buffer_size);
TestValues<int>(1, _int_desc, false);
TestValues<int>(10, _int_desc, false);
TestValues<int>(100, _int_desc, false);
TestValues<int>(1, _int_desc, true);
TestValues<int>(10, _int_desc, true);
TestValues<int>(100, _int_desc, true);
TestValues<StringValue>(1, _string_desc, false);
TestValues<StringValue>(10, _string_desc, false);
TestValues<StringValue>(100, _string_desc, false);
TestValues<StringValue>(1, _string_desc, true);
TestValues<StringValue>(10, _string_desc, true);
TestValues<StringValue>(100, _string_desc, true);
TestIntValuesInterleaved(1, 1);
TestIntValuesInterleaved(10, 5);
TestIntValuesInterleaved(100, 15);
}
// #endif
#if 0
// Test that deep copy works with arrays by copying into a BufferedTupleStream2, freeing
// the original rows, then reading back the rows and verifying the contents.
TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
Status status;
InitBlockMgr(-1, 8 * 1024 * 1024);
const int NUM_ROWS = 4000;
BufferedTupleStream2 stream(_runtime_state, *_array_desc, _runtime_state->block_mgr2(),
_client, false, false);
const std::vector<TupleDescriptor*>& tuple_descs = _array_desc->tuple_descriptors();
// Write out a predictable pattern of data by iterating over arrays of constants.
int strings_index = 0; // we take the mod of this as index into STRINGS.
int array_lens[] = { 0, 1, 5, 10, 1000, 2, 49, 20 };
int num_array_lens = sizeof(array_lens) / sizeof(array_lens[0]);
int array_len_index = 0;
for (int i = 0; i < NUM_ROWS; ++i) {
int expected_row_size = tuple_descs[0]->byte_size() + tuple_descs[1]->byte_size();
// gscoped_ptr<TupleRow, FreeDeleter> row(reinterpret_cast<TupleRow*>(
// malloc(tuple_descs.size() * sizeof(Tuple*))));
// gscoped_ptr<Tuple, FreeDeleter> tuple0(reinterpret_cast<Tuple*>(
// malloc(tuple_descs[0]->byte_size())));
// gscoped_ptr<Tuple, FreeDeleter> tuple1(reinterpret_cast<Tuple*>(
// malloc(tuple_descs[1]->byte_size())));
boost::scoped_ptr<TupleRow> row(reinterpret_cast<TupleRow*>(
malloc(tuple_descs.size() * sizeof(Tuple*))));
boost::scoped_ptr<Tuple> tuple0(reinterpret_cast<Tuple*>(
malloc(tuple_descs[0]->byte_size())));
boost::scoped_ptr<Tuple> tuple1(reinterpret_cast<Tuple*>(
malloc(tuple_descs[1]->byte_size())));
memset(tuple0.get(), 0, tuple_descs[0]->byte_size());
memset(tuple1.get(), 0, tuple_descs[1]->byte_size());
row->set_tuple(0, tuple0.get());
row->set_tuple(1, tuple1.get());
// Only array<string> is non-null.
tuple0->set_null(tuple_descs[0]->slots()[1]->null_indicator_offset());
tuple1->set_null(tuple_descs[1]->slots()[0]->null_indicator_offset());
const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0];
const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor();
int array_len = array_lens[array_len_index++ % num_array_lens];
CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset());
cv->ptr = NULL;
cv->num_tuples = 0;
CollectionValueBuilder builder(cv, *item_desc, _mem_pool.get(), array_len);
Tuple* array_data;
builder.GetFreeMemory(&array_data);
expected_row_size += item_desc->byte_size() * array_len;
// Fill the array with pointers to our constant strings.
for (int j = 0; j < array_len; ++j) {
const StringValue* string = &STRINGS[strings_index++ % NUM_STRINGS];
array_data->SetNotNull(item_desc->slots()[0]->null_indicator_offset());
RawValue::Write(string, array_data, item_desc->slots()[0], _mem_pool.get());
array_data += item_desc->byte_size();
expected_row_size += string->len;
}
builder.CommitTuples(array_len);
// Check that internal row size computation gives correct result.
EXPECT_EQ(expected_row_size, stream.ComputeRowSize(row.get()));
bool b = stream.add_row(row.get(), &status);
ASSERT_TRUE(b);
ASSERT_TRUE(status.ok());
_mem_pool->FreeAll(); // Free data as soon as possible to smoke out issues.
}
// Read back and verify data.
stream.prepare_for_read(false);
strings_index = 0;
array_len_index = 0;
bool eos = false;
int rows_read = 0;
RowBatch batch(*_array_desc, BATCH_SIZE, _tracker.get());
do {
batch.reset();
ASSERT_TRUE(stream.get_next(&batch, &eos).ok());
for (int i = 0; i < batch.num_rows(); ++i) {
TupleRow* row = batch.GetRow(i);
Tuple* tuple0 = row->get_tuple(0);
Tuple* tuple1 = row->get_tuple(1);
ASSERT_TRUE(tuple0 != NULL);
ASSERT_TRUE(tuple1 != NULL);
const SlotDescriptor* array_slot_desc = tuple_descs[0]->slots()[0];
ASSERT_FALSE(tuple0->IsNull(array_slot_desc->null_indicator_offset()));
ASSERT_TRUE(tuple0->IsNull(tuple_descs[0]->slots()[1]->null_indicator_offset()));
ASSERT_TRUE(tuple1->IsNull(tuple_descs[1]->slots()[0]->null_indicator_offset()));
const TupleDescriptor* item_desc = array_slot_desc->collection_item_descriptor();
int expected_array_len = array_lens[array_len_index++ % num_array_lens];
CollectionValue* cv = tuple0->GetCollectionSlot(array_slot_desc->tuple_offset());
ASSERT_EQ(expected_array_len, cv->num_tuples);
for (int j = 0; j < cv->num_tuples; ++j) {
Tuple* item = reinterpret_cast<Tuple*>(cv->ptr + j * item_desc->byte_size());
const SlotDescriptor* string_desc = item_desc->slots()[0];
ASSERT_FALSE(item->IsNull(string_desc->null_indicator_offset()));
const StringValue* expected = &STRINGS[strings_index++ % NUM_STRINGS];
const StringValue* actual = item->GetStringSlot(string_desc->tuple_offset());
ASSERT_EQ(*expected, *actual);
}
}
rows_read += batch.num_rows();
} while (!eos);
ASSERT_EQ(NUM_ROWS, rows_read);
}
#endif
// TODO: more tests.
// - The stream can operate in many modes
} // namespace doris
int main(int argc, char** argv) {
// std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
// if (!doris::config::init(conffile.c_str(), false)) {
// fprintf(stderr, "error read config file. \n");
// return -1;
// }
doris::config::query_scratch_dirs = "/tmp";
// doris::config::max_free_io_buffers = 128;
doris::config::read_size = 8388608;
doris::config::min_buffer_size = 1024;
doris::config::disable_mem_pools = false;
doris::init_glog("be-test");
::testing::InitGoogleTest(&argc, argv);
doris::CpuInfo::init();
doris::DiskInfo::init();
return RUN_ALL_TESTS();
}