blob: 40831b100bdf5dbbfc7ef8ba98c6e11c09d24b13 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <random>
#include <string>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/schema-internal.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/types.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/type_traits.h"
#include "kudu/integration-tests/cluster_verifier.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/char_util.h"
#include "kudu/util/decimal_util.h"
#include "kudu/util/int128.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DEFINE_int32(num_rows_per_tablet, 100, "The number of rows to be inserted into each tablet");
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::client::sp::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace client {
static const int kNumTabletServers = 3;
static const int kNumTablets = 3;
static const int kMaxBatchSize = 8 * 1024 * 1024;
template<typename KeyTypeWrapper>
struct SliceKeysTestSetup {
SliceKeysTestSetup()
: max_rows_(MathLimits<int>::kMax),
rows_per_tablet_(std::min(max_rows_/ kNumTablets, FLAGS_num_rows_per_tablet)),
increment_(static_cast<int>(MathLimits<int>::kMax / kNumTablets)) {
}
void AddKeyColumnsToSchema(KuduSchemaBuilder* builder) const {
auto column_spec = builder->AddColumn("key");
column_spec->Type(client::FromInternalDataType(KeyTypeWrapper::kType))
->NotNull()->PrimaryKey();
if (KeyTypeWrapper::kType == DECIMAL32) {
column_spec->Precision(kMaxDecimal32Precision);
} else if (KeyTypeWrapper::kType == DECIMAL64) {
column_spec->Precision(kMaxDecimal64Precision);
} else if (KeyTypeWrapper::kType == DECIMAL128) {
column_spec->Precision(kMaxDecimal128Precision);
}
}
// Split points are calculated by equally partitioning the int64_t key space and then
// using the stringified hexadecimal representation to create the split keys (with
// zero padding).
vector<const KuduPartialRow*> GenerateSplitRows(const KuduSchema& schema) const {
vector<string> splits;
splits.reserve(kNumTablets - 1);
for (int i = 1; i < kNumTablets; i++) {
int split = i * increment_;
splits.push_back(StringPrintf("%08x", split));
}
vector<const KuduPartialRow*> rows;
for (string val : splits) {
Slice slice(val);
KuduPartialRow* row = schema.NewRow();
CHECK_OK(row->SetSliceCopy<TypeTraits<KeyTypeWrapper::kType> >(0, slice));
rows.push_back(row);
}
return rows;
}
Status GenerateRowKey(KuduInsert* insert, int split_idx, int row_idx) const {
int row_key_num = (split_idx * increment_) + row_idx;
string row_key = StringPrintf("%08x", row_key_num);
Slice row_key_slice(row_key);
return insert->mutable_row()->SetSliceCopy<TypeTraits<KeyTypeWrapper::kType> >(0,
row_key_slice);
}
Status VerifyRowKeySlice(Slice row_key_slice, int split_idx, int row_idx) const {
int expected_row_key_num = (split_idx * increment_) + row_idx;
string expected_row_key = StringPrintf("%08x", expected_row_key_num);
Slice expected_row_key_slice(expected_row_key);
if (expected_row_key_slice.compare(row_key_slice) != 0) {
return Status::Corruption(strings::Substitute("Keys didn't match. Expected: $0 Got: $1",
expected_row_key_slice.ToDebugString(),
row_key_slice.ToDebugString()));
}
return Status::OK();
}
Status VerifyRowKey(const KuduRowResult& result, int split_idx, int row_idx) const {
Slice row_key;
RETURN_NOT_OK(result.Get<TypeTraits<KeyTypeWrapper::kType>>(0, &row_key));
return VerifyRowKeySlice(row_key, split_idx, row_idx);
}
Status VerifyRowKeyRaw(const uint8_t* raw_key, int split_idx, int row_idx) const {
Slice row_key = *reinterpret_cast<const Slice*>(raw_key);
return VerifyRowKeySlice(row_key, split_idx, row_idx);
}
int GetRowsPerTablet() const {
return rows_per_tablet_;
}
int GetMaxRows() const {
return max_rows_;
}
vector<string> GetKeyColumns() const {
vector<string> key_col;
key_col.emplace_back("key");
return key_col;
}
int max_rows_;
int rows_per_tablet_;
int increment_;
};
template<typename KeyTypeWrapper>
struct IntKeysTestSetup {
typedef typename TypeTraits<KeyTypeWrapper::kType>::cpp_type CppType;
IntKeysTestSetup()
// If CppType is actually bigger than int (e.g. int64_t) casting the max to int
// returns -1, so we make sure in that case we get max from int directly.
: max_rows_(static_cast<int>(MathLimits<CppType>::kMax) != -1 ?
static_cast<int>(MathLimits<CppType>::kMax) : MathLimits<int>::kMax),
increment_(max_rows_ / kNumTablets),
rows_per_tablet_(std::min(increment_, FLAGS_num_rows_per_tablet)) {
DCHECK(base::is_integral<CppType>::value);
}
void AddKeyColumnsToSchema(KuduSchemaBuilder* builder) const {
auto column_spec = builder->AddColumn("key");
column_spec->Type(client::FromInternalDataType(KeyTypeWrapper::kType))
->NotNull()->PrimaryKey();
if (KeyTypeWrapper::kType == DECIMAL32) {
column_spec->Precision(kMaxDecimal32Precision);
} else if (KeyTypeWrapper::kType == DECIMAL64) {
column_spec->Precision(kMaxDecimal64Precision);
} else if (KeyTypeWrapper::kType == DECIMAL128) {
column_spec->Precision(kMaxDecimal128Precision);
}
}
vector<const KuduPartialRow*> GenerateSplitRows(const KuduSchema& schema) const {
vector<CppType> splits;
splits.reserve(kNumTablets - 1);
for (int64_t i = 1; i < kNumTablets; i++) {
splits.push_back(i * increment_);
}
vector<const KuduPartialRow*> rows;
for (CppType val : splits) {
KuduPartialRow* row = schema.NewRow();
CHECK_OK(row->Set<TypeTraits<KeyTypeWrapper::kType> >(0, val));
rows.push_back(row);
}
return rows;
}
Status GenerateRowKey(KuduInsert* insert, int split_idx, int row_idx) const {
CppType val = (split_idx * increment_) + row_idx;
return insert->mutable_row()->Set<TypeTraits<KeyTypeWrapper::kType> >(0, val);
}
Status VerifyIntRowKey(CppType val, int split_idx, int row_idx) const {
int expected = (split_idx * increment_) + row_idx;
if (val != expected) {
return Status::Corruption(strings::Substitute("Keys didn't match. Expected: $0 Got: $1",
expected, val));
}
return Status::OK();
}
Status VerifyRowKey(const KuduRowResult& result, int split_idx, int row_idx) const {
CppType val;
RETURN_NOT_OK(result.Get<TypeTraits<KeyTypeWrapper::kType>>(0, &val));
return VerifyIntRowKey(val, split_idx, row_idx);
}
Status VerifyRowKeyRaw(const uint8_t* raw_key, int split_idx, int row_idx) const {
CppType val = UnalignedLoad<CppType>(raw_key);
return VerifyIntRowKey(val, split_idx, row_idx);
}
int GetRowsPerTablet() const {
return rows_per_tablet_;
}
int GetMaxRows() const {
return max_rows_;
}
vector<string> GetKeyColumns() const {
vector<string> key_col;
key_col.emplace_back("key");
return key_col;
}
int max_rows_;
int increment_;
int rows_per_tablet_;
};
struct DateKeysTestSetup {
DateKeysTestSetup()
: min_value(DataTypeTraits<DATE>::kMinValue),
max_rows_(DataTypeTraits<DATE>::kMaxValue - min_value),
increment_(max_rows_ / kNumTablets),
rows_per_tablet_(std::min(increment_, FLAGS_num_rows_per_tablet)) {
}
void AddKeyColumnsToSchema(KuduSchemaBuilder* builder) const {
auto column_spec = builder->AddColumn("key");
column_spec->Type(KuduColumnSchema::DATE)
->NotNull()->PrimaryKey();
}
vector<const KuduPartialRow*> GenerateSplitRows(const KuduSchema& schema) const {
vector<int> splits;
splits.reserve(kNumTablets - 1);
for (int64_t i = 1; i < kNumTablets; i++) {
splits.push_back(min_value + i * increment_);
}
vector<const KuduPartialRow*> rows;
for (int val : splits) {
KuduPartialRow* row = schema.NewRow();
CHECK_OK(row->SetDate(0, val));
rows.push_back(row);
}
return rows;
}
Status GenerateRowKey(KuduInsert* insert, int split_idx, int row_idx) const {
int val = min_value + (split_idx * increment_) + row_idx;
return insert->mutable_row()->SetDate(0, val);
}
Status VerifyIntRowKey(int val, int split_idx, int row_idx) const {
int expected = min_value + (split_idx * increment_) + row_idx;
if (val != expected) {
return Status::Corruption(strings::Substitute("Keys didn't match. Expected: $0 Got: $1",
expected, val));
}
return Status::OK();
}
Status VerifyRowKey(const KuduRowResult& result, int split_idx, int row_idx) const {
int val;
RETURN_NOT_OK(result.GetDate(0, &val));
return VerifyIntRowKey(val, split_idx, row_idx);
}
Status VerifyRowKeyRaw(const uint8_t* raw_key, int split_idx, int row_idx) const {
int val = UnalignedLoad<int32_t>(raw_key);
return VerifyIntRowKey(val, split_idx, row_idx);
}
int GetRowsPerTablet() const {
return rows_per_tablet_;
}
int GetMaxRows() const {
return max_rows_;
}
vector<string> GetKeyColumns() const {
vector<string> key_col;
key_col.emplace_back("key");
return key_col;
}
int min_value;
int max_rows_;
int increment_;
int rows_per_tablet_;
};
struct ExpectedVals {
int8_t expected_int8_val;
int16_t expected_int16_val;
int32_t expected_int32_val;
int64_t expected_int64_val;
int64_t expected_timestamp_val;
int32_t expected_date_val;
string slice_content;
Slice expected_slice_val;
Slice expected_binary_val;
Slice expected_varchar_val;
bool expected_bool_val;
float expected_float_val;
double expected_double_val;
int128_t expected_decimal_val;
};
// Integration that writes, scans and verifies all types.
template <class TestSetup>
class AllTypesItest : public KuduTest {
public:
AllTypesItest() {
if (AllowSlowTests()) {
FLAGS_num_rows_per_tablet = 10000;
}
SeedRandom();
setup_ = TestSetup();
}
// Builds a schema that includes all (frontend) supported types.
// The key is templated so that we can try different key types.
void CreateAllTypesSchema() {
KuduSchemaBuilder builder;
setup_.AddKeyColumnsToSchema(&builder);
builder.AddColumn("int8_val")->Type(KuduColumnSchema::INT8);
builder.AddColumn("int16_val")->Type(KuduColumnSchema::INT16);
builder.AddColumn("int32_val")->Type(KuduColumnSchema::INT32);
builder.AddColumn("int64_val")->Type(KuduColumnSchema::INT64);
builder.AddColumn("timestamp_val")->Type(KuduColumnSchema::UNIXTIME_MICROS);
builder.AddColumn("date_val")->Type(KuduColumnSchema::DATE);
builder.AddColumn("string_val")->Type(KuduColumnSchema::STRING);
builder.AddColumn("varchar_val")->Type(KuduColumnSchema::VARCHAR)->Length(kMaxVarcharLength);
builder.AddColumn("bool_val")->Type(KuduColumnSchema::BOOL);
builder.AddColumn("float_val")->Type(KuduColumnSchema::FLOAT);
builder.AddColumn("double_val")->Type(KuduColumnSchema::DOUBLE);
builder.AddColumn("binary_val")->Type(KuduColumnSchema::BINARY);
builder.AddColumn("decimal32_val")->Type(KuduColumnSchema::DECIMAL)
->Precision(kMaxDecimal32Precision);
builder.AddColumn("decimal64_val")->Type(KuduColumnSchema::DECIMAL)
->Precision(kMaxDecimal64Precision);
builder.AddColumn("decimal128_val")->Type(KuduColumnSchema::DECIMAL)
->Precision(kMaxDecimal128Precision);
CHECK_OK(builder.Build(&schema_));
}
Status CreateCluster() {
static const vector<string> kTsFlags = {
// Set the flush threshold low so that we have flushes and test the on-disk formats.
"--flush_threshold_mb=1",
// Set the major delta compaction ratio low enough that we trigger a lot of them.
"--tablet_delta_store_major_compact_min_ratio=0.001",
// TODO(KUDU-1346) Remove custom consensus_max_batch_size_bytes setting
// once KUDU-1346 is fixed. It's necessary to change the default
// value of the consensus_max_batch_size_bytes flag to avoid
// triggering debug assert when a relatively big chunk of write operations
// is flushed to the tablet server.
"--consensus_max_batch_size_bytes=2097152",
};
ExternalMiniClusterOptions opts;
opts.num_tablet_servers = kNumTabletServers;
for (const std::string& flag : kTsFlags) {
opts.extra_tserver_flags.push_back(flag);
}
cluster_.reset(new ExternalMiniCluster(std::move(opts)));
RETURN_NOT_OK(cluster_->Start());
return cluster_->CreateClient(nullptr, &client_);
}
Status CreateTable() {
CreateAllTypesSchema();
vector<const KuduPartialRow*> split_rows = setup_.GenerateSplitRows(schema_);
unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
for (const KuduPartialRow* row : split_rows) {
split_rows_.push_back(*row);
}
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wdeprecated-declarations"
RETURN_NOT_OK(table_creator->table_name("all-types-table")
.schema(&schema_)
.set_range_partition_columns({ "key" })
.split_rows(split_rows)
.num_replicas(kNumTabletServers)
.Create());
#pragma GCC diagnostic pop
return client_->OpenTable("all-types-table", &table_);
}
Status GenerateRow(KuduSession* session, int split_idx, int row_idx) {
KuduInsert* insert = table_->NewInsert();
RETURN_NOT_OK(setup_.GenerateRowKey(insert, split_idx, row_idx));
int int_val = (split_idx * setup_.GetRowsPerTablet()) + row_idx;
KuduPartialRow* row = insert->mutable_row();
RETURN_NOT_OK(row->SetInt8("int8_val", int_val));
RETURN_NOT_OK(row->SetInt16("int16_val", int_val));
RETURN_NOT_OK(row->SetInt32("int32_val", int_val));
RETURN_NOT_OK(row->SetInt64("int64_val", int_val));
RETURN_NOT_OK(row->SetUnixTimeMicros("timestamp_val", int_val));
RETURN_NOT_OK(row->SetDate("date_val", int_val));
string content = strings::Substitute("hello $0", int_val);
Slice slice_val(content);
RETURN_NOT_OK(row->SetStringCopy("string_val", slice_val));
RETURN_NOT_OK(row->SetBinaryCopy("binary_val", slice_val));
RETURN_NOT_OK(row->SetVarchar("varchar_val", slice_val));
double double_val = int_val;
RETURN_NOT_OK(row->SetDouble("double_val", double_val));
RETURN_NOT_OK(row->SetFloat("float_val", double_val));
RETURN_NOT_OK(row->SetBool("bool_val", int_val % 2));
RETURN_NOT_OK(row->SetUnscaledDecimal("decimal32_val", int_val));
RETURN_NOT_OK(row->SetUnscaledDecimal("decimal64_val", int_val));
RETURN_NOT_OK(row->SetUnscaledDecimal("decimal128_val", int_val));
VLOG(1) << "Inserting row[" << split_idx << "," << row_idx << "]" << insert->ToString();
RETURN_NOT_OK(session->Apply(insert));
return Status::OK();
}
// This inserts kNumRowsPerTablet in each of the tablets. In the end we should have
// perfectly partitioned table, if the encoding of the keys was correct and the rows
// ended up in the right place.
Status InsertRows() {
shared_ptr<KuduSession> session = client_->NewSession();
RETURN_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_BACKGROUND));
const int max_rows_per_tablet = setup_.GetRowsPerTablet();
for (int i = 0; i < kNumTablets; ++i) {
for (int j = 0; j < max_rows_per_tablet; ++j) {
RETURN_NOT_OK(GenerateRow(session.get(), i, j));
}
}
RETURN_NOT_OK(session->Flush());
return Status::OK();
}
void SetupProjection(vector<string>* projection) {
vector<string> keys = setup_.GetKeyColumns();
for (const string& key : keys) {
projection->push_back(key);
}
projection->push_back("int8_val");
projection->push_back("int16_val");
projection->push_back("int32_val");
projection->push_back("int64_val");
projection->push_back("timestamp_val");
projection->push_back("date_val");
projection->push_back("string_val");
projection->push_back("binary_val");
projection->push_back("varchar_val");
projection->push_back("double_val");
projection->push_back("float_val");
projection->push_back("bool_val");
projection->push_back("decimal32_val");
projection->push_back("decimal64_val");
projection->push_back("decimal128_val");
}
ExpectedVals GetExpectedValsForRow(int split_idx, int row_idx) {
ExpectedVals vals;
int64_t expected_int_val = (split_idx * setup_.GetRowsPerTablet()) + row_idx;
vals.expected_int8_val = static_cast<int8_t>(expected_int_val);
vals.expected_int16_val = static_cast<int16_t>(expected_int_val);
vals.expected_int32_val = static_cast<int32_t>(expected_int_val);
vals.expected_int64_val = expected_int_val;
vals.expected_timestamp_val = expected_int_val;
vals.expected_date_val = static_cast<int32_t>(expected_int_val);
vals.slice_content = strings::Substitute("hello $0", expected_int_val);
vals.expected_slice_val = Slice(vals.slice_content);
vals.expected_varchar_val = Slice(vals.slice_content);
vals.expected_binary_val = Slice(vals.slice_content);
vals.expected_bool_val = expected_int_val % 2;
vals.expected_float_val = expected_int_val;
vals.expected_double_val = expected_int_val;
vals.expected_decimal_val = expected_int_val;
return vals;
}
void VerifyRow(const KuduRowResult& row, int split_idx, int row_idx) {
ASSERT_OK(setup_.VerifyRowKey(row, split_idx, row_idx));
ExpectedVals vals = GetExpectedValsForRow(split_idx, row_idx);
int8_t int8_val;
ASSERT_OK(row.GetInt8("int8_val", &int8_val));
ASSERT_EQ(int8_val, vals.expected_int8_val);
int16_t int16_val;
ASSERT_OK(row.GetInt16("int16_val", &int16_val));
ASSERT_EQ(int16_val, vals.expected_int16_val);
int32_t int32_val;
ASSERT_OK(row.GetInt32("int32_val", &int32_val));
ASSERT_EQ(int32_val, vals.expected_int32_val);
int64_t int64_val;
ASSERT_OK(row.GetInt64("int64_val", &int64_val));
ASSERT_EQ(int64_val, vals.expected_int64_val);
int64_t timestamp_val;
ASSERT_OK(row.GetUnixTimeMicros("timestamp_val", &timestamp_val));
ASSERT_EQ(timestamp_val, vals.expected_timestamp_val);
int32_t date_val;
ASSERT_OK(row.GetDate("date_val", &date_val));
ASSERT_EQ(date_val, vals.expected_date_val);
Slice string_val;
ASSERT_OK(row.GetString("string_val", &string_val));
ASSERT_EQ(string_val, vals.expected_slice_val);
Slice binary_val;
ASSERT_OK(row.GetBinary("binary_val", &binary_val));
ASSERT_EQ(binary_val, vals.expected_binary_val);
Slice varchar_val;
ASSERT_OK(row.GetVarchar("varchar_val", &varchar_val));
ASSERT_EQ(varchar_val, vals.expected_varchar_val);
bool bool_val;
ASSERT_OK(row.GetBool("bool_val", &bool_val));
ASSERT_EQ(bool_val, vals.expected_bool_val);
double double_val;
ASSERT_OK(row.GetDouble("double_val", &double_val));
ASSERT_EQ(double_val, vals.expected_double_val);
float float_val;
ASSERT_OK(row.GetFloat("float_val", &float_val));
ASSERT_EQ(float_val, vals.expected_float_val);
int128_t decimal32_val;
ASSERT_OK(row.GetUnscaledDecimal("decimal32_val", &decimal32_val));
ASSERT_EQ(decimal32_val, vals.expected_decimal_val);
int128_t decimal64_val;
ASSERT_OK(row.GetUnscaledDecimal("decimal64_val", &decimal64_val));
ASSERT_EQ(decimal64_val, vals.expected_decimal_val);
int128_t decimal128_val;
ASSERT_OK(row.GetUnscaledDecimal("decimal128_val", &decimal128_val));
ASSERT_EQ(decimal128_val, vals.expected_decimal_val);
}
typedef std::function<Status (KuduScanner* scanner)> ScannerSetup;
typedef std::function<void (const KuduScanBatch& batch,
int num_tablet,
int* total_rows_in_tablet)> RowVerifier;
Status VerifyRows(const ScannerSetup& scanner_setup, const RowVerifier& verifier) {
int total_rows = 0;
// Scan a single tablet and make sure it has the rows we expect in the amount we
// expect.
for (int i = 0; i < kNumTablets; ++i) {
KuduScanner scanner(table_.get());
string low_split;
string high_split;
if (i != 0) {
const KuduPartialRow& split = split_rows_[i - 1];
RETURN_NOT_OK(scanner.AddLowerBound(split));
low_split = split.ToString();
}
if (i != kNumTablets - 1) {
const KuduPartialRow& split = split_rows_[i];
RETURN_NOT_OK(scanner.AddExclusiveUpperBound(split));
high_split = split.ToString();
}
RETURN_NOT_OK(scanner_setup(&scanner));
RETURN_NOT_OK(scanner.SetBatchSizeBytes(kMaxBatchSize));
RETURN_NOT_OK(scanner.SetFaultTolerant());
RETURN_NOT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
RETURN_NOT_OK(scanner.Open());
LOG(INFO) << "Scanning tablet: [" << low_split << ", " << high_split << ")";
int total_rows_in_tablet = 0;
while (scanner.HasMoreRows()) {
KuduScanBatch batch;
scanner.NextBatch(&batch);
verifier(batch, i, &total_rows_in_tablet);
}
CHECK_EQ(total_rows_in_tablet, setup_.GetRowsPerTablet());
total_rows += total_rows_in_tablet;
}
CHECK_EQ(total_rows, setup_.GetRowsPerTablet() * kNumTablets);
return Status::OK();
}
void RunTest(const ScannerSetup& scanner_setup, const RowVerifier& verifier) {
ASSERT_OK(CreateCluster());
ASSERT_OK(CreateTable());
ASSERT_OK(InsertRows());
// Check that all of the replicas agree on the inserted data. This retries until
// all replicas are up-to-date, which is important to ensure that the following
// Verify always passes.
NO_FATALS(ClusterVerifier(cluster_.get()).CheckCluster());
// Check that the inserted data matches what we thought we inserted.
ASSERT_OK(VerifyRows(scanner_setup, verifier));
}
virtual void TearDown() OVERRIDE {
cluster_->AssertNoCrashes();
cluster_->Shutdown();
}
protected:
TestSetup setup_;
KuduSchema schema_;
vector<KuduPartialRow> split_rows_;
shared_ptr<KuduClient> client_;
unique_ptr<ExternalMiniCluster> cluster_;
shared_ptr<KuduTable> table_;
};
// Wrap the actual DataType so that we can have the setup structs be friends of other classes
// without leaking DataType.
template<DataType KeyType>
struct KeyTypeWrapper {
static const DataType kType = KeyType;
};
typedef ::testing::Types<IntKeysTestSetup<KeyTypeWrapper<INT8> >,
IntKeysTestSetup<KeyTypeWrapper<INT16> >,
IntKeysTestSetup<KeyTypeWrapper<INT32> >,
IntKeysTestSetup<KeyTypeWrapper<INT64> >,
IntKeysTestSetup<KeyTypeWrapper<DECIMAL32> >,
IntKeysTestSetup<KeyTypeWrapper<DECIMAL64> >,
IntKeysTestSetup<KeyTypeWrapper<DECIMAL128> >,
IntKeysTestSetup<KeyTypeWrapper<UNIXTIME_MICROS> >,
DateKeysTestSetup,
SliceKeysTestSetup<KeyTypeWrapper<STRING> >,
SliceKeysTestSetup<KeyTypeWrapper<BINARY> >
> KeyTypes;
TYPED_TEST_CASE(AllTypesItest, KeyTypes);
TYPED_TEST(AllTypesItest, TestAllKeyTypes) {
vector<string> projection;
this->SetupProjection(&projection);
auto scanner_setup = [&](KuduScanner* scanner) {
return scanner->SetProjectedColumnNames(projection);
};
auto row_verifier = [&](const KuduScanBatch& batch, int num_tablet, int* total_rows_in_tablet) {
for (int i = 0; i < batch.NumRows(); i++) {
NO_FATALS(this->VerifyRow(batch.Row(i), num_tablet, *total_rows_in_tablet + i));
}
*total_rows_in_tablet += batch.NumRows();
};
this->RunTest(scanner_setup, row_verifier);
}
TYPED_TEST(AllTypesItest, TestTimestampPadding) {
vector<string> projection;
this->SetupProjection(&projection);
auto scanner_setup = [&](KuduScanner* scanner) {
// Each time this function is called we shuffle the projection to get the chance
// of having timestamps in different places of the projection and before/after
// different types.
std::mt19937 gen(SeedRandom());
std::shuffle(projection.begin(), projection.end(), gen);
RETURN_NOT_OK(scanner->SetProjectedColumnNames(projection));
int row_format_flags = KuduScanner::NO_FLAGS;
row_format_flags |= KuduScanner::PAD_UNIXTIME_MICROS_TO_16_BYTES;
return scanner->SetRowFormatFlags(row_format_flags);
};
auto row_verifier = [&](const KuduScanBatch& batch, int num_tablet, int* total_rows_in_tablet) {
// Timestamps are padded to 16 bytes.
int kPaddedTimestampSize = 16;
// Calculate the projection size, each of the column offsets and the size of the null bitmap.
const KuduSchema* schema = batch.projection_schema();
vector<int> projection_offsets;
int row_stride = 0;
int num_nullable_cols = 0;
for (int i = 0; i < schema->num_columns(); i++) {
KuduColumnSchema col_schema = schema->Column(i);
if (col_schema.is_nullable()) num_nullable_cols++;
switch (col_schema.type()) {
case KuduColumnSchema::UNIXTIME_MICROS:
projection_offsets.push_back(kPaddedTimestampSize);
row_stride += kPaddedTimestampSize;
break;
default:
int col_size = GetTypeInfo(ToInternalDataType(col_schema.type(),
col_schema.type_attributes()))->size();
projection_offsets.push_back(col_size);
row_stride += col_size;
}
}
int non_null_bitmap_size = BitmapSize(num_nullable_cols);
row_stride += non_null_bitmap_size;
Slice direct_data = batch.direct_data();
ASSERT_EQ(direct_data.size(), row_stride * batch.NumRows());
const uint8_t* row_data = direct_data.data();
for (int i = 0; i < batch.NumRows(); i++) {
for (int j = 0; j < schema->num_columns(); j++) {
KuduColumnSchema col_schema = schema->Column(j);
if (col_schema.name() == "key") {
ASSERT_OK(this->setup_.VerifyRowKeyRaw(row_data, num_tablet, *total_rows_in_tablet + i));
} else {
ExpectedVals vals = this->GetExpectedValsForRow(num_tablet, *total_rows_in_tablet + i);
DataType internal_type = ToInternalDataType(col_schema.type(),
col_schema.type_attributes());
switch (col_schema.type()) {
case KuduColumnSchema::INT8:
ASSERT_EQ(*reinterpret_cast<const int8_t*>(row_data), vals.expected_int8_val);
break;
case KuduColumnSchema::INT16:
ASSERT_EQ(*reinterpret_cast<const int16_t*>(row_data), vals.expected_int16_val);
break;
case KuduColumnSchema::INT32:
ASSERT_EQ(*reinterpret_cast<const int32_t*>(row_data), vals.expected_int32_val);
break;
case KuduColumnSchema::INT64:
ASSERT_EQ(*reinterpret_cast<const int64_t*>(row_data), vals.expected_int64_val);
break;
case KuduColumnSchema::UNIXTIME_MICROS:
ASSERT_EQ(*reinterpret_cast<const int64_t*>(row_data), vals.expected_timestamp_val);
break;
case KuduColumnSchema::DATE:
ASSERT_EQ(*reinterpret_cast<const int32_t*>(row_data), vals.expected_date_val);
break;
case KuduColumnSchema::STRING:
ASSERT_EQ(*reinterpret_cast<const Slice*>(row_data), vals.expected_slice_val);
break;
case KuduColumnSchema::BINARY:
ASSERT_EQ(*reinterpret_cast<const Slice*>(row_data), vals.expected_binary_val);
break;
case KuduColumnSchema::VARCHAR:
ASSERT_EQ(*reinterpret_cast<const Slice*>(row_data), vals.expected_varchar_val);
break;
case KuduColumnSchema::BOOL:
ASSERT_EQ(*reinterpret_cast<const bool*>(row_data), vals.expected_bool_val);
break;
case KuduColumnSchema::FLOAT:
ASSERT_EQ(*reinterpret_cast<const float*>(row_data), vals.expected_float_val);
break;
case KuduColumnSchema::DOUBLE:
ASSERT_EQ(*reinterpret_cast<const double*>(row_data), vals.expected_double_val);
break;
case KuduColumnSchema::DECIMAL:
switch (internal_type) {
case DECIMAL32:
ASSERT_EQ(*reinterpret_cast<const int32_t*>(row_data),
vals.expected_decimal_val);
break;
case DECIMAL64:
ASSERT_EQ(*reinterpret_cast<const int64_t*>(row_data),
vals.expected_decimal_val);
break;
case DECIMAL128:
ASSERT_EQ(UnalignedLoad<int128_t>(row_data),
vals.expected_decimal_val);
break;
default:
LOG(FATAL) << "Unexpected internal decimal type: " << internal_type;
}
break;
default:
LOG(FATAL) << "Unexpected type: " << col_schema.type();
}
}
row_data += projection_offsets[j];
}
row_data += non_null_bitmap_size;
}
*total_rows_in_tablet += batch.NumRows();
};
this->RunTest(scanner_setup, row_verifier);
}
} // namespace client
} // namespace kudu