blob: 175866c7b009aae8634fa18037a2b0e7a6db1981 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/common/wire_protocol.h"
#include <algorithm>
#include <cstdint>
#include <cstring>
#include <list>
#include <numeric>
#include <optional>
#include <ostream>
#include <random>
#include <string>
#include <tuple>
#include <type_traits>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/common/column_predicate.h"
#include "kudu/common/columnar_serialization.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/block_bloom_filter.h"
#include "kudu/util/faststring.h"
#include "kudu/util/hash.pb.h"
#include "kudu/util/hexdump.h"
#include "kudu/util/int128.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/random.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h" // IWYU pragma: keep
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
using std::optional;
using std::string;
using std::tuple;
using std::vector;
using strings::Substitute;
namespace kudu {
class WireProtocolTest : public KuduTest {
public:
WireProtocolTest()
: schema_({ ColumnSchema("string", STRING),
ColumnSchema("nullable_string", STRING, /* is_nullable=*/true),
ColumnSchema("int", INT32),
ColumnSchema("nullable_int", INT32, /* is_nullable=*/true),
ColumnSchema("int64", INT64) },
1),
test_data_arena_(4096) {
}
static void FillRowBlockWithTestRows(RowBlock* block) {
Random rng(SeedRandom());
block->selection_vector()->SetAllTrue();
for (int i = 0; i < block->nrows(); i++) {
if (rng.OneIn(10)) {
block->selection_vector()->SetRowUnselected(i);
continue;
}
RowBlockRow row = block->row(i);
// We make new copies of these strings into the Arena for each row so that
// the workload is more realistic. If we just re-use the same Slice object
// for each row, the memory accesses fit entirely into a smaller number of
// cache lines and we may micro-optimize for the wrong thing.
CHECK(block->arena()->RelocateSlice(
"hello world col0",
reinterpret_cast<Slice*>(row.mutable_cell_ptr(0))));
if (rng.OneIn(3)) {
row.cell(1).set_null(true);
} else {
row.cell(1).set_null(false);
CHECK(block->arena()->RelocateSlice(
"hello world col1",
reinterpret_cast<Slice*>(row.mutable_cell_ptr(1))));
}
*reinterpret_cast<int32_t*>(row.mutable_cell_ptr(2)) = i;
*reinterpret_cast<int32_t*>(row.mutable_cell_ptr(3)) = i;
row.cell(3).set_null(rng.OneIn(7));
*reinterpret_cast<int64_t*>(row.mutable_cell_ptr(4)) = i;
}
}
protected:
Schema schema_;
Arena test_data_arena_;
};
TEST_F(WireProtocolTest, TestOKStatus) {
Status s = Status::OK();
AppStatusPB pb;
StatusToPB(s, &pb);
EXPECT_EQ(AppStatusPB::OK, pb.code());
EXPECT_FALSE(pb.has_message());
EXPECT_FALSE(pb.has_posix_code());
Status s2 = StatusFromPB(pb);
ASSERT_OK(s2);
}
TEST_F(WireProtocolTest, TestBadStatus) {
Status s = Status::NotFound("foo", "bar");
AppStatusPB pb;
StatusToPB(s, &pb);
EXPECT_EQ(AppStatusPB::NOT_FOUND, pb.code());
EXPECT_TRUE(pb.has_message());
EXPECT_EQ("foo: bar", pb.message());
EXPECT_FALSE(pb.has_posix_code());
Status s2 = StatusFromPB(pb);
EXPECT_TRUE(s2.IsNotFound());
EXPECT_EQ(s.ToString(), s2.ToString());
}
TEST_F(WireProtocolTest, TestBadStatusWithPosixCode) {
Status s = Status::NotFound("foo", "bar", 1234);
AppStatusPB pb;
StatusToPB(s, &pb);
EXPECT_EQ(AppStatusPB::NOT_FOUND, pb.code());
EXPECT_TRUE(pb.has_message());
EXPECT_EQ("foo: bar", pb.message());
EXPECT_TRUE(pb.has_posix_code());
EXPECT_EQ(1234, pb.posix_code());
Status s2 = StatusFromPB(pb);
EXPECT_TRUE(s2.IsNotFound());
EXPECT_EQ(1234, s2.posix_code());
EXPECT_EQ(s.ToString(), s2.ToString());
}
TEST_F(WireProtocolTest, TestSchemaRoundTrip) {
google::protobuf::RepeatedPtrField<ColumnSchemaPB> pbs;
ASSERT_OK(SchemaToColumnPBs(schema_, &pbs));
ASSERT_EQ(5, pbs.size());
// Column 0.
EXPECT_TRUE(pbs.Get(0).is_key());
EXPECT_EQ("string", pbs.Get(0).name());
EXPECT_EQ(STRING, pbs.Get(0).type());
EXPECT_FALSE(pbs.Get(0).is_nullable());
// Column 1.
EXPECT_FALSE(pbs.Get(1).is_key());
EXPECT_EQ("nullable_string", pbs.Get(1).name());
EXPECT_EQ(STRING, pbs.Get(1).type());
EXPECT_TRUE(pbs.Get(1).is_nullable());
// Column 2.
EXPECT_FALSE(pbs.Get(2).is_key());
EXPECT_EQ("int", pbs.Get(2).name());
EXPECT_EQ(INT32, pbs.Get(2).type());
EXPECT_FALSE(pbs.Get(2).is_nullable());
// Column 3.
EXPECT_FALSE(pbs.Get(3).is_key());
EXPECT_EQ("nullable_int", pbs.Get(3).name());
EXPECT_EQ(INT32, pbs.Get(3).type());
EXPECT_TRUE(pbs.Get(3).is_nullable());
// Column 4.
EXPECT_FALSE(pbs.Get(4).is_key());
EXPECT_EQ("int64", pbs.Get(4).name());
EXPECT_EQ(INT64, pbs.Get(4).type());
EXPECT_FALSE(pbs.Get(4).is_nullable());
// Convert back to a Schema object and verify they're identical.
Schema schema2;
ASSERT_OK(ColumnPBsToSchema(pbs, &schema2));
EXPECT_EQ(schema_.ToString(), schema2.ToString());
EXPECT_EQ(schema_.num_key_columns(), schema2.num_key_columns());
}
// Test that, when non-contiguous key columns are passed, an error Status
// is returned.
TEST_F(WireProtocolTest, TestBadSchema_NonContiguousKey) {
google::protobuf::RepeatedPtrField<ColumnSchemaPB> pbs;
// Column 0: key
ColumnSchemaPB* col_pb = pbs.Add();
col_pb->set_name("c0");
col_pb->set_type(STRING);
col_pb->set_is_key(true);
// Column 1: not a key
col_pb = pbs.Add();
col_pb->set_name("c1");
col_pb->set_type(STRING);
col_pb->set_is_key(false);
// Column 2: marked as key. This is an error.
col_pb = pbs.Add();
col_pb->set_name("c2");
col_pb->set_type(STRING);
col_pb->set_is_key(true);
Schema schema;
Status s = ColumnPBsToSchema(pbs, &schema);
ASSERT_STR_CONTAINS(s.ToString(), "Got out-of-order key column");
}
// Test that, when multiple columns with the same name are passed, an
// error Status is returned.
TEST_F(WireProtocolTest, TestBadSchema_DuplicateColumnName) {
google::protobuf::RepeatedPtrField<ColumnSchemaPB> pbs;
// Column 0:
ColumnSchemaPB* col_pb = pbs.Add();
col_pb->set_name("c0");
col_pb->set_type(STRING);
col_pb->set_is_key(true);
// Column 1:
col_pb = pbs.Add();
col_pb->set_name("c1");
col_pb->set_type(STRING);
col_pb->set_is_key(false);
// Column 2: same name as column 0
col_pb = pbs.Add();
col_pb->set_name("c0");
col_pb->set_type(STRING);
col_pb->set_is_key(false);
Schema schema;
Status s = ColumnPBsToSchema(pbs, &schema);
ASSERT_EQ("Invalid argument: Duplicate column name: c0", s.ToString());
}
// Create a block of rows and ensure that it can be converted to and from protobuf.
TEST_F(WireProtocolTest, TestRowBlockToRowwisePB) {
RowBlockMemory mem(1024);
RowBlock block(&schema_, 30, &mem);
FillRowBlockWithTestRows(&block);
// Convert to PB.
faststring direct, indirect;
int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect);
SCOPED_TRACE("Row data: " + direct.ToString());
SCOPED_TRACE("Indirect data: " + indirect.ToString());
// Convert back to a row, ensure that the resulting row is the same
// as the one we put in.
RowwiseRowBlockPB pb;
pb.set_num_rows(num_rows);
vector<const uint8_t*> row_ptrs;
Slice direct_sidecar = direct;
ASSERT_OK(ExtractRowsFromRowBlockPB(schema_, pb, indirect,
&direct_sidecar, &row_ptrs));
ASSERT_EQ(block.selection_vector()->CountSelected(), row_ptrs.size());
int dst_row_idx = 0;
for (int i = 0; i < block.nrows(); ++i) {
if (!block.selection_vector()->IsRowSelected(i)) {
continue;
}
ConstContiguousRow row_roundtripped(&schema_, row_ptrs[dst_row_idx]);
EXPECT_EQ(schema_.DebugRow(block.row(i)),
schema_.DebugRow(row_roundtripped));
dst_row_idx++;
}
}
// Create blocks of rows and ensure that they can be converted to the columnar serialized
// layout.
TEST_F(WireProtocolTest, TestRowBlockToColumnarPB) {
// Generate several blocks of random data.
static constexpr int kNumBlocks = 3;
static constexpr int kBatchSizeBytes = 8192 * 1024;
RowBlockMemory mem(1024);
std::list<RowBlock> blocks;
for (int i = 0; i < kNumBlocks; i++) {
blocks.emplace_back(&schema_, 30, &mem);
FillRowBlockWithTestRows(&blocks.back());
}
// Convert all of the RowBlocks to a single serialized (concatenated) columnar format.
ColumnarSerializedBatch batch(schema_, schema_, kBatchSizeBytes);
for (const auto& block : blocks) {
batch.AddRowBlock(block);
}
// Verify that the resulting serialized data matches the concatenated original data blocks.
ASSERT_EQ(5, batch.columns().size());
int dst_row_idx = 0;
for (const auto& block : blocks) {
for (int src_row_idx = 0; src_row_idx < block.nrows(); src_row_idx++) {
if (!block.selection_vector()->IsRowSelected(src_row_idx)) {
continue;
}
SCOPED_TRACE(src_row_idx);
SCOPED_TRACE(dst_row_idx);
const auto& row = block.row(src_row_idx);
for (int c = 0; c < schema_.num_columns(); c++) {
SCOPED_TRACE(c);
const auto& col = schema_.column(c);
const auto& serialized_col = batch.columns()[c];
if (col.is_nullable()) {
bool expect_null = row.is_null(c);;
EXPECT_EQ(!BitmapTest(serialized_col.non_null_bitmap->data(), dst_row_idx),
expect_null);
if (expect_null) {
continue;
}
}
int type_size = col.type_info()->size();
Slice serialized_val;
Slice orig_val;
if (col.type_info()->physical_type() == BINARY) {
const uint8_t* offset_ptr = serialized_col.data.data() + sizeof(uint32_t) * dst_row_idx;
uint32_t start_offset = UnalignedLoad<uint32_t>(offset_ptr);
uint32_t end_offset = UnalignedLoad<uint32_t>(offset_ptr + sizeof(uint32_t));
ASSERT_GE(end_offset, start_offset);
serialized_val = Slice(serialized_col.varlen_data->data() + start_offset,
end_offset - start_offset);
memcpy(&orig_val, row.cell_ptr(c), type_size);
} else {
serialized_val = Slice(serialized_col.data.data() + type_size * dst_row_idx,
type_size);
orig_val = Slice(row.cell_ptr(c), type_size);
}
EXPECT_EQ(orig_val, serialized_val);
}
dst_row_idx++;
}
}
}
// Create a block of rows in columnar layout and ensure that it can be
// converted to and from protobuf.
TEST_F(WireProtocolTest, TestColumnarRowBlockToPBWithPadding) {
int kNumRows = 10;
RowBlockMemory mem(1024);
// Create a schema with multiple UNIXTIME_MICROS columns in different
// positions.
Schema tablet_schema({ ColumnSchema("key", UNIXTIME_MICROS),
ColumnSchema("col1", STRING),
ColumnSchema("col2", UNIXTIME_MICROS),
ColumnSchema("col3", INT32, true /* nullable */),
ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */)}, 1);
RowBlock block(&tablet_schema, kNumRows, &mem);
block.selection_vector()->SetAllTrue();
for (int i = 0; i < block.nrows(); i++) {
RowBlockRow row = block.row(i);
*reinterpret_cast<int64_t*>(row.mutable_cell_ptr(0)) = i;
Slice col1;
// See: FillRowBlockWithTestRows() for the reason why we relocate these
// to 'test_data_arena_'.
CHECK(test_data_arena_.RelocateSlice("hello world col1", &col1));
*reinterpret_cast<Slice*>(row.mutable_cell_ptr(1)) = col1;
*reinterpret_cast<int64_t*>(row.mutable_cell_ptr(2)) = i;
*reinterpret_cast<int32_t*>(row.mutable_cell_ptr(3)) = i;
row.cell(3).set_null(false);
*reinterpret_cast<int64_t*>(row.mutable_cell_ptr(4)) = i;
row.cell(4).set_null(true);
}
// Have the projection schema have columns in a different order from the table schema.
Schema proj_schema({ ColumnSchema("col1", STRING),
ColumnSchema("key", UNIXTIME_MICROS),
ColumnSchema("col2", UNIXTIME_MICROS),
ColumnSchema("col4", UNIXTIME_MICROS, true /* nullable */),
ColumnSchema("col3", INT32, true /* nullable */)}, 0);
// Convert to PB.
faststring direct, indirect;
int num_rows = SerializeRowBlock(block, &proj_schema, &direct, &indirect,
true /* pad timestamps */);
SCOPED_TRACE("Row data: " + HexDump(direct));
SCOPED_TRACE("Indirect data: " + HexDump(indirect));
// Convert back to a row, ensure that the resulting row is the same
// as the one we put in. Can't reuse the decoding methods since we
// won't support decoding padded rows within Kudu.
RowwiseRowBlockPB pb;
pb.set_num_rows(num_rows);
vector<const uint8_t*> row_ptrs;
Slice direct_sidecar = direct;
Slice indirect_sidecar = indirect;
ASSERT_OK(RewriteRowBlockPointers(proj_schema, pb, indirect_sidecar, &direct_sidecar, true));
// Row stride is the normal size for the schema + the number of UNIXTIME_MICROS columns * 8,
// the size of the padding per column.
size_t row_stride = ContiguousRowHelper::row_size(proj_schema) + 3 * 8;
ASSERT_EQ(direct_sidecar.size(), row_stride * kNumRows);
const uint8_t* base_data;
for (int i = 0; i < kNumRows; i++) {
base_data = direct_sidecar.data() + i * row_stride;
// With padding, the null bitmap is at offset 68.
// See the calculations below to understand why.
const uint8_t* non_null_bitmap = base_data + 68;
// 'col1' comes at 0 bytes offset in the projection schema.
const Slice* col1 = reinterpret_cast<const Slice*>(base_data);
ASSERT_EQ(Slice("hello world col1"), *col1)
<< "Unexpected val for the " << i << "th row:" << col1->ToDebugString();
// 'key' comes at 16 bytes offset.
const int64_t key = *reinterpret_cast<const int64_t*>(base_data + 16);
EXPECT_EQ(key, i);
// 'col2' comes at 32 bytes offset: 16 bytes previous, 16 bytes 'key'
const int64_t col2 = *reinterpret_cast<const int64_t*>(base_data + 32);
EXPECT_EQ(col2, i);
// 'col4' is supposed to be null, but should also read 0 since we memsetted the
// memory to 0. It should come at 48 bytes offset: 32 bytes previous + 8 bytes 'col2' +
// 8 bytes padding.
const int64_t col4 = *reinterpret_cast<const int64_t*>(base_data + 48);
EXPECT_EQ(col4, 0);
EXPECT_TRUE(BitmapTest(non_null_bitmap, 3));
// 'col3' comes at 64 bytes offset: 48 bytes previous, 8 bytes 'col4', 8 bytes padding
const int32_t col3 = *reinterpret_cast<const int32_t*>(base_data + 64);
EXPECT_EQ(col3, i);
EXPECT_FALSE(BitmapTest(non_null_bitmap, 4));
}
}
struct RowwiseConverter {
static void Run(const RowBlock& block) {
faststring direct;
faststring indirect;
SerializeRowBlock(block, nullptr, &direct, &indirect);
}
static constexpr const char* kName = "row-wise";
};
struct ColumnarConverter {
static void Run(const RowBlock& block) {
constexpr int kBatchSizeBytes = 8192 * 1024;
ColumnarSerializedBatch batch(*block.schema(), *block.schema(), kBatchSizeBytes);
batch.AddRowBlock(block);
}
static constexpr const char* kName = "columnar";
};
struct BenchmarkColumnsSpec {
struct Col {
DataType type;
double null_fraction; // negative for non-null
};
vector<Col> columns;
string name;
};
class WireProtocolBenchmark :
public WireProtocolTest,
public testing::WithParamInterface<tuple<BenchmarkColumnsSpec, double>> {
public:
void ResetBenchmarkSchema(const BenchmarkColumnsSpec& spec) {
vector<ColumnSchema> column_schemas;
int i = 0;
for (const auto& c : spec.columns) {
column_schemas.emplace_back(Substitute("col$0", i++),
c.type,
/*nullable=*/c.null_fraction >= 0);
}
CHECK_OK(benchmark_schema_.Reset(std::move(column_schemas), 0));
}
void FillRowBlockForBenchmark(const BenchmarkColumnsSpec& spec,
RowBlock* block) {
Random rng(SeedRandom());
test_data_arena_.Reset();
for (int i = 0; i < block->nrows(); i++) {
RowBlockRow row = block->row(i);
for (int j = 0; j < benchmark_schema_.num_columns(); j++) {
const ColumnSchema& column_schema = benchmark_schema_.column(j);
DataType type = spec.columns[j].type;
bool is_null = rng.NextDoubleFraction() <= spec.columns[j].null_fraction;
if (column_schema.is_nullable()) {
row.cell(j).set_null(is_null);
}
if (!is_null) {
switch (type) {
case STRING: {
Slice col;
CHECK(test_data_arena_.RelocateSlice(Substitute("hello world $0",
column_schema.name()), &col));
memcpy(row.mutable_cell_ptr(j), &col, sizeof(Slice));
break;
}
case INT128:
UnalignedStore<int128_t>(row.mutable_cell_ptr(j), i);
break;
case INT64:
UnalignedStore<int64_t>(row.mutable_cell_ptr(j), i);
break;
case INT32:
UnalignedStore<int32_t>(row.mutable_cell_ptr(j), i);
break;
case INT16:
UnalignedStore<int16_t>(row.mutable_cell_ptr(j), i);
break;
case INT8:
UnalignedStore<int8_t>(row.mutable_cell_ptr(j), i);
break;
default:
LOG(FATAL) << "Unexpected type: " << type;
}
}
}
}
}
static void SelectRandomRowsWithRate(RowBlock* block, double rate) {
CHECK_LE(rate, 1.0);
CHECK_GE(rate, 0.0);
auto select_count = block->nrows() * rate;
SelectionVector* select_vector = block->selection_vector();
if (rate == 1.0) {
select_vector->SetAllTrue();
} else if (rate == 0.0) {
select_vector->SetAllFalse();
} else {
vector<int> indexes(block->nrows());
std::iota(indexes.begin(), indexes.end(), 0);
std::mt19937 gen(SeedRandom());
std::shuffle(indexes.begin(), indexes.end(), gen);
indexes.resize(select_count);
select_vector->SetAllFalse();
for (auto index : indexes) {
select_vector->SetRowSelected(index);
}
}
CHECK_EQ(select_vector->CountSelected(), select_count);
}
// Use column_count to control the schema scale.
// Use select_rate to control the number of selected rows.
template<class Converter>
double RunBenchmark(const BenchmarkColumnsSpec& spec,
double select_rate) {
ResetBenchmarkSchema(spec);
RowBlockMemory mem(1024);
RowBlock block(&benchmark_schema_, 1000, &mem);
// Regardless of the config, use a constant number of selected cells for the test by
// looping the conversion an appropriate number of times.
const int64_t kNumCellsToConvert = AllowSlowTests() ? 100000000 : 1000000;
const int64_t kCellsPerBlock = block.nrows() * spec.columns.size();
const double kSelectedCellsPerBlock = kCellsPerBlock * select_rate;
const int kNumTrials = static_cast<int>(kNumCellsToConvert / kSelectedCellsPerBlock);
FillRowBlockForBenchmark(spec, &block);
SelectRandomRowsWithRate(&block, select_rate);
int64_t cycle_start = CycleClock::Now();
for (int i = 0; i < kNumTrials; ++i) {
Converter::Run(block);
}
int64_t cycle_end = CycleClock::Now();
double cycles_per_cell = static_cast<double>(cycle_end - cycle_start) / kNumCellsToConvert;
LOG(INFO) << Substitute(
"Converting $0 to PB (method $3) row select rate $1: $2 cycles/cell",
spec.name, select_rate, cycles_per_cell,
Converter::kName);
return cycles_per_cell;
}
protected:
Schema benchmark_schema_;
};
TEST_P(WireProtocolBenchmark, TestRowBlockToPBBenchmark) {
const auto& spec = std::get<0>(GetParam());
double select_rate = std::get<1>(GetParam());
double cycles_per_cell_rowwise = RunBenchmark<RowwiseConverter>(spec, select_rate);
double cycles_per_cell_columnar = RunBenchmark<ColumnarConverter>(spec, select_rate);
double ratio = cycles_per_cell_rowwise / cycles_per_cell_columnar;
LOG(INFO) << Substitute(
"Converting $0 to PB row select rate $1: columnar/rowwise throughput ratio: $2x",
spec.name, select_rate, ratio);
}
BenchmarkColumnsSpec UniformColumns(int n_cols, DataType type, double null_fraction) {
vector<BenchmarkColumnsSpec::Col> cols(n_cols);
for (int i = 0; i < n_cols; i++) {
cols[i] = {type, null_fraction};
}
string null_str;
if (null_fraction >= 0) {
null_str = Substitute("$0pct_null", static_cast<int>(null_fraction * 100));
} else {
null_str = "non_null";
}
return {cols, Substitute("$0_$1_$2",
n_cols,
GetTypeInfo(type)->name(),
null_str) };
}
INSTANTIATE_TEST_SUITE_P(
ColumnarRowBlockToPBBenchmarkParams, WireProtocolBenchmark,
testing::Combine(
testing::Values(
UniformColumns(10, INT64, -1),
UniformColumns(10, INT32, -1),
UniformColumns(10, STRING, -1),
UniformColumns(10, INT128, 0),
UniformColumns(10, INT64, 0),
UniformColumns(10, INT32, 0),
UniformColumns(10, INT16, 0),
UniformColumns(10, INT8, 0),
UniformColumns(10, STRING, 0),
UniformColumns(10, INT128, 0.1),
UniformColumns(10, INT64, 0.1),
UniformColumns(10, INT32, 0.1),
UniformColumns(10, INT16, 0.1),
UniformColumns(10, INT8, 0.1),
UniformColumns(10, STRING, 0.1)),
// Selection rates.
testing::Values(1.0, 0.8, 0.5, 0.2)),
[](const testing::TestParamInfo<WireProtocolBenchmark::ParamType>& info) {
return Substitute("$0_sel_$1pct",
std::get<0>(info.param).name,
static_cast<int>(std::get<1>(info.param)*100));
});
// Test that trying to extract rows from an invalid block correctly returns
// Corruption statuses.
TEST_F(WireProtocolTest, TestInvalidRowBlock) {
Schema schema({ ColumnSchema("col1", STRING) }, 1);
RowwiseRowBlockPB pb;
vector<const uint8_t*> row_ptrs;
// Too short to be valid data.
const char* shortstr = "x";
pb.set_num_rows(1);
Slice direct = shortstr;
Status s = ExtractRowsFromRowBlockPB(schema, pb, Slice(), &direct, &row_ptrs);
ASSERT_STR_CONTAINS(s.ToString(), "Corruption: Row block has 1 bytes of data");
// Bad pointer into indirect data.
shortstr = "xxxxxxxxxxxxxxxx";
pb.set_num_rows(1);
direct = Slice(shortstr);
s = ExtractRowsFromRowBlockPB(schema, pb, Slice(), &direct, &row_ptrs);
ASSERT_STR_CONTAINS(s.ToString(),
"Corruption: Row #0 contained bad indirect slice");
}
// Test serializing a block which has a selection vector but no columns.
// This is the sort of result that is returned from a scan with an empty
// projection (a COUNT(*) query).
TEST_F(WireProtocolTest, TestBlockWithNoColumns) {
Schema empty(std::vector<ColumnSchema>(), 0);
RowBlockMemory mem(1024);
RowBlock block(&empty, 1000, &mem);
block.selection_vector()->SetAllTrue();
// Unselect 100 rows
for (int i = 0; i < 100; i++) {
block.selection_vector()->SetRowUnselected(i * 2);
}
ASSERT_EQ(900, block.selection_vector()->CountSelected());
// Convert it to protobuf, ensure that the results look right.
faststring direct, indirect;
int num_rows = SerializeRowBlock(block, nullptr, &direct, &indirect);
ASSERT_EQ(900, num_rows);
}
TEST_F(WireProtocolTest, TestColumnDefaultValue) {
Slice write_default_str("Hello Write");
Slice read_default_str("Hello Read");
uint32_t write_default_u32 = 512;
uint32_t read_default_u32 = 256;
ColumnSchemaPB pb;
ColumnSchema col1("col1", STRING);
ColumnSchemaToPB(col1, &pb);
optional<ColumnSchema> col1fpb;
ASSERT_OK(ColumnSchemaFromPB(pb, &col1fpb));
ASSERT_FALSE(col1fpb->has_read_default());
ASSERT_FALSE(col1fpb->has_write_default());
ASSERT_TRUE(col1fpb->read_default_value() == nullptr);
ColumnSchema col2("col2", STRING, false, false, &read_default_str);
ColumnSchemaToPB(col2, &pb);
optional<ColumnSchema> col2fpb;
ASSERT_OK(ColumnSchemaFromPB(pb, &col2fpb));
ASSERT_TRUE(col2fpb->has_read_default());
ASSERT_FALSE(col2fpb->has_write_default());
ASSERT_EQ(read_default_str, *static_cast<const Slice *>(col2fpb->read_default_value()));
ASSERT_EQ(nullptr, static_cast<const Slice *>(col2fpb->write_default_value()));
ColumnSchema col3("col3", STRING, false, false, &read_default_str, &write_default_str);
ColumnSchemaToPB(col3, &pb);
optional<ColumnSchema> col3fpb;
ASSERT_OK(ColumnSchemaFromPB(pb, &col3fpb));
ASSERT_TRUE(col3fpb->has_read_default());
ASSERT_TRUE(col3fpb->has_write_default());
ASSERT_EQ(read_default_str, *static_cast<const Slice *>(col3fpb->read_default_value()));
ASSERT_EQ(write_default_str, *static_cast<const Slice *>(col3fpb->write_default_value()));
ColumnSchema col4("col4", UINT32, false, false, &read_default_u32);
ColumnSchemaToPB(col4, &pb);
optional<ColumnSchema> col4fpb;
ASSERT_OK(ColumnSchemaFromPB(pb, &col4fpb));
ASSERT_TRUE(col4fpb->has_read_default());
ASSERT_FALSE(col4fpb->has_write_default());
ASSERT_EQ(read_default_u32, *static_cast<const uint32_t *>(col4fpb->read_default_value()));
ASSERT_EQ(nullptr, static_cast<const uint32_t *>(col4fpb->write_default_value()));
ColumnSchema col5("col5", UINT32, false, false, &read_default_u32, &write_default_u32);
ColumnSchemaToPB(col5, &pb);
optional<ColumnSchema> col5fpb;
ASSERT_OK(ColumnSchemaFromPB(pb, &col5fpb));
ASSERT_TRUE(col5fpb->has_read_default());
ASSERT_TRUE(col5fpb->has_write_default());
ASSERT_EQ(read_default_u32, *static_cast<const uint32_t *>(col5fpb->read_default_value()));
ASSERT_EQ(write_default_u32, *static_cast<const uint32_t *>(col5fpb->write_default_value()));
}
// Regression test for KUDU-2378; the call to ColumnSchemaFromPB yielded a crash.
TEST_F(WireProtocolTest, TestCrashOnAlignedLoadOf128BitReadDefault) {
ColumnSchemaPB pb;
pb.set_name("col");
pb.set_type(DECIMAL128);
pb.set_read_default_value(string(16, 'a'));
optional<ColumnSchema> col;
ASSERT_OK(ColumnSchemaFromPB(pb, &col));
}
// Regression test for KUDU-2622; Validate read and write default value sizes.
TEST_F(WireProtocolTest, TestInvalidReadAndWriteDefault) {
{
ColumnSchemaPB pb;
pb.set_name("col");
pb.set_type(DECIMAL128);
pb.set_read_default_value(string(8, 'a'));
optional<ColumnSchema> col;
Status s = ColumnSchemaFromPB(pb, &col);
EXPECT_TRUE(s.IsCorruption());
ASSERT_STR_CONTAINS(s.ToString(), "Corruption: Not enough bytes for decimal: read default");
}
{
ColumnSchemaPB pb;
pb.set_name("col");
pb.set_type(DECIMAL128);
pb.set_write_default_value(string(8, 'a'));
optional<ColumnSchema> col;
Status s = ColumnSchemaFromPB(pb, &col);
EXPECT_TRUE(s.IsCorruption());
ASSERT_STR_CONTAINS(s.ToString(), "Corruption: Not enough bytes for decimal: write default");
}
}
TEST_F(WireProtocolTest, TestColumnPredicateInList) {
ColumnSchema col1("col1", INT32);
vector<ColumnSchema> cols = { col1 };
Schema schema(cols, 1);
RowBlockMemory mem(1024);
optional<ColumnPredicate> predicate;
{ // col1 IN (5, 6, 10)
int five = 5;
int six = 6;
int ten = 10;
vector<const void*> values { &five, &six, &ten };
kudu::ColumnPredicate cp = kudu::ColumnPredicate::InList(col1, &values);
ColumnPredicatePB pb;
NO_FATALS(ColumnPredicateToPB(cp, &pb));
ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
ASSERT_EQ(predicate->predicate_type(), PredicateType::InList);
ASSERT_EQ(3, predicate->raw_values().size());
}
{ // col1 IN (0, 0)
// We can't construct a single element IN list directly since it would be
// simplified to an equality predicate, so we hack around it by directly
// constructing it as a protobuf message.
ColumnPredicatePB pb;
pb.set_column("col1");
*pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
*pb.mutable_in_list()->mutable_values()->Add() = string("\0\0\0\0", 4);
ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
ASSERT_EQ(PredicateType::Equality, predicate->predicate_type());
}
{ // col1 IN ()
ColumnPredicatePB pb;
pb.set_column("col1");
pb.mutable_in_list();
RowBlockMemory mem(1024);
optional<ColumnPredicate> predicate;
ASSERT_OK(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate));
ASSERT_EQ(PredicateType::None, predicate->predicate_type());
}
{ // IN list corruption
ColumnPredicatePB pb;
pb.set_column("col1");
pb.mutable_in_list();
*pb.mutable_in_list()->mutable_values()->Add() = string("\0", 1);
RowBlockMemory mem(1024);
optional<ColumnPredicate> predicate;
ASSERT_TRUE(ColumnPredicateFromPB(schema, &mem.arena, pb, &predicate).IsInvalidArgument());
}
}
class BFWireProtocolTest : public KuduTest {
public:
BFWireProtocolTest()
: schema_({ ColumnSchema("col1", INT32)}, 1),
arena_(1024),
allocator_(&arena_),
n_keys_(100),
b1_(&allocator_),
b2_(&allocator_) {}
void SetUp() override {
int log_space_bytes1 = BlockBloomFilter::MinLogSpace(n_keys_, 0.01);
ASSERT_OK(b1_.Init(log_space_bytes1, FAST_HASH, 0));
ASSERT_LE(BlockBloomFilter::FalsePositiveProb(n_keys_, log_space_bytes1), 0.01);
int log_space_bytes2 = BlockBloomFilter::MinLogSpace(n_keys_, 0.01);
ASSERT_OK(b2_.Init(log_space_bytes2, FAST_HASH, 0));
ASSERT_LE(BlockBloomFilter::FalsePositiveProb(n_keys_, log_space_bytes2), 0.01);
for (int i = 0; i < n_keys_; ++i) {
Slice key_slice(reinterpret_cast<const uint8_t*>(&i), sizeof(i));
b1_.Insert(key_slice);
b2_.Insert(key_slice);
}
}
protected:
Schema schema_;
Arena arena_;
ArenaBlockBloomFilterBufferAllocator allocator_;
int n_keys_;
BlockBloomFilter b1_;
BlockBloomFilter b2_;
};
TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilter) {
optional<ColumnPredicate> predicate;
ColumnSchema col1 = schema_.column(0);
{ // Single BloomFilter predicate.
kudu::ColumnPredicate ibf =
kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, nullptr, nullptr);
ColumnPredicatePB pb;
NO_FATALS(ColumnPredicateToPB(ibf, &pb));
ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
ASSERT_EQ(predicate, ibf);
}
{ // Multi BloomFilter predicate.
kudu::ColumnPredicate ibf =
kudu::ColumnPredicate::InBloomFilter(col1, {&b1_, &b2_}, nullptr, nullptr);
ColumnPredicatePB pb;
NO_FATALS(ColumnPredicateToPB(ibf, &pb));
ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
ASSERT_EQ(predicate, ibf);
}
}
TEST_F(BFWireProtocolTest, TestColumnPredicateBloomFilterWithBound) {
optional<ColumnPredicate> predicate;
ColumnSchema col1 = schema_.column(0);
{ // Simply BloomFilter with lower bound.
int lower = 1;
auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, &lower, nullptr);
ColumnPredicatePB pb;
NO_FATALS(ColumnPredicateToPB(ibf, &pb));
ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
ASSERT_EQ(predicate, ibf);
}
{ // Single bloom filter with upper bound.
int upper = 4;
auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, nullptr, &upper);
ColumnPredicatePB pb;
NO_FATALS(ColumnPredicateToPB(ibf, &pb));
ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
ASSERT_EQ(predicate, ibf);
}
{ // Single bloom filter with both lower and upper bound.
int lower = 1;
int upper = 4;
auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_}, &lower, &upper);
ColumnPredicatePB pb;
NO_FATALS(ColumnPredicateToPB(ibf, &pb));
ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
ASSERT_EQ(predicate, ibf);
}
{ // Multi bloom filter with both lower and upper bound.
int lower = 1;
int upper = 4;
auto ibf = kudu::ColumnPredicate::InBloomFilter(col1, {&b1_, &b2_}, &lower,
&upper);
ColumnPredicatePB pb;
NO_FATALS(ColumnPredicateToPB(ibf, &pb));
ASSERT_OK(ColumnPredicateFromPB(schema_, &arena_, pb, &predicate));
ASSERT_EQ(predicate->predicate_type(), PredicateType::InBloomFilter);
ASSERT_EQ(predicate->bloom_filters().size(), ibf.bloom_filters().size());
ASSERT_EQ(predicate, ibf);
}
}
} // namespace kudu