blob: 02a943a984c0259296ec70ceb8222117aec916d1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <atomic>
#include <memory>
#include <string>
#include <thread>
#include <unordered_set>
#include <vector>
#include "kudu/client/client.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/integration-tests/mini_cluster.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/util/test_util.h"
namespace kudu {
namespace client {
using sp::shared_ptr;
using std::atomic;
using std::string;
using std::thread;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using tserver::MiniTabletServer;
class ScanTokenTest : public KuduTest {
protected:
void SetUp() override {
// Set up the mini cluster
cluster_.reset(new MiniCluster(env_.get(), MiniClusterOptions()));
ASSERT_OK(cluster_->Start());
ASSERT_OK(cluster_->CreateClient(nullptr, &client_));
}
// Creates a new session in manual flush mode.
shared_ptr<KuduSession> CreateSession() {
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(10000);
CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
return session;
}
// Count the rows in a table which satisfy the specified predicates. Simulates
// a central query planner / remote task execution by creating a thread per
// token, each with a new client.
int CountRows(const vector<KuduScanToken*>& tokens) {
atomic<uint32_t> rows(0);
vector<thread> threads;
for (KuduScanToken* token : tokens) {
string buf;
CHECK_OK(token->Serialize(&buf));
threads.emplace_back(thread([this, &rows] (string serialized_token) {
shared_ptr<KuduClient> client;
ASSERT_OK(cluster_->CreateClient(nullptr, &client));
KuduScanner* scanner_ptr;
ASSERT_OK(KuduScanToken::DeserializeIntoScanner(client.get(),
serialized_token,
&scanner_ptr));
unique_ptr<KuduScanner> scanner(scanner_ptr);
scanner->Open();
while (scanner->HasMoreRows()) {
KuduScanBatch batch;
CHECK_OK(scanner->NextBatch(&batch));
rows += batch.NumRows();
}
scanner->Close();
}, std::move(buf)));
}
for (thread& thread : threads) {
thread.join();
}
return rows;
}
void VerifyTabletInfo(const vector<KuduScanToken*>& tokens) {
unordered_set<string> tablet_ids;
for (auto t : tokens) {
tablet_ids.insert(t->tablet().id());
// Check that there's only one replica; this is a non-replicated table.
ASSERT_EQ(1, t->tablet().replicas().size());
// Check that this replica is a leader; since there's only one tserver,
// it must be.
const KuduReplica* r = t->tablet().replicas()[0];
ASSERT_TRUE(r->is_leader());
// Check that the tserver associated with the replica is the sole tserver
// started for this cluster.
const MiniTabletServer* ts = cluster_->mini_tablet_server(0);
ASSERT_EQ(ts->server()->instance_pb().permanent_uuid(),
r->ts().uuid());
ASSERT_EQ(ts->bound_rpc_addr().host(), r->ts().hostname());
ASSERT_EQ(ts->bound_rpc_addr().port(), r->ts().port());
}
// Check that there are no duplicate tablet IDs.
ASSERT_EQ(tokens.size(), tablet_ids.size());
}
shared_ptr<KuduClient> client_;
gscoped_ptr<MiniCluster> cluster_;
};
TEST_F(ScanTokenTest, TestScanTokens) {
// Create schema
KuduSchema schema;
{
KuduSchemaBuilder builder;
builder.AddColumn("col")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
ASSERT_OK(builder.Build(&schema));
}
// Create table
shared_ptr<KuduTable> table;
{
unique_ptr<KuduPartialRow> split(schema.NewRow());
ASSERT_OK(split->SetInt64("col", 0));
unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name("table")
.schema(&schema)
.add_hash_partitions({ "col" }, 4)
.split_rows({ split.release() })
.num_replicas(1)
.Create());
ASSERT_OK(client_->OpenTable("table", &table));
}
// Create session
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(10000);
ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
// Insert rows
for (int i = -100; i < 100; i++) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("col", i));
ASSERT_OK(session->Apply(insert.release()));
}
ASSERT_OK(session->Flush());
{ // no predicates
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
ASSERT_EQ(8, tokens.size());
ASSERT_EQ(200, CountRows(tokens));
NO_FATALS(VerifyTabletInfo(tokens));
}
{ // range predicate
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(table.get());
unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate("col",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromInt(0)));
ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
ASSERT_OK(builder.Build(&tokens));
ASSERT_EQ(4, tokens.size());
ASSERT_EQ(100, CountRows(tokens));
NO_FATALS(VerifyTabletInfo(tokens));
}
{ // equality predicate
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(table.get());
unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate("col",
KuduPredicate::EQUAL,
KuduValue::FromInt(42)));
ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
ASSERT_OK(builder.Build(&tokens));
ASSERT_EQ(1, tokens.size());
ASSERT_EQ(1, CountRows(std::move(tokens)));
NO_FATALS(VerifyTabletInfo(tokens));
}
{ // primary key bound
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(table.get());
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
ASSERT_OK(lower_bound->SetInt64("col", 40));
ASSERT_OK(builder.AddLowerBound(*lower_bound));
ASSERT_OK(builder.Build(&tokens));
ASSERT_EQ(4, tokens.size());
ASSERT_EQ(60, CountRows(tokens));
NO_FATALS(VerifyTabletInfo(tokens));
}
}
TEST_F(ScanTokenTest, TestScanTokensWithNonCoveringRange) {
// Create schema
KuduSchema schema;
{
KuduSchemaBuilder builder;
builder.AddColumn("col")->NotNull()->Type(KuduColumnSchema::INT64)->PrimaryKey();
ASSERT_OK(builder.Build(&schema));
}
// Create table
shared_ptr<KuduTable> table;
{
unique_ptr<KuduPartialRow> lower_bound(schema.NewRow());
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
unique_ptr<client::KuduTableCreator> table_creator(client_->NewTableCreator());
table_creator->table_name("table");
table_creator->num_replicas(1);
table_creator->schema(&schema);
ASSERT_OK(lower_bound->SetInt64("col", 0));
ASSERT_OK(upper_bound->SetInt64("col", 100));
table_creator->add_range_partition(lower_bound.release(), upper_bound.release());
lower_bound.reset(schema.NewRow());
upper_bound.reset(schema.NewRow());
ASSERT_OK(lower_bound->SetInt64("col", 200));
ASSERT_OK(upper_bound->SetInt64("col", 400));
table_creator->add_range_partition(lower_bound.release(), upper_bound.release());
unique_ptr<KuduPartialRow> split(schema.NewRow());
ASSERT_OK(split->SetInt64("col", 300));
table_creator->add_range_partition_split(split.release());
table_creator->add_hash_partitions({ "col" }, 2);
ASSERT_OK(table_creator->Create());
ASSERT_OK(client_->OpenTable("table", &table));
}
// Create session
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(10000);
ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
// Insert rows
for (int i = 0; i < 100; i++) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("col", i));
ASSERT_OK(session->Apply(insert.release()));
}
for (int i = 200; i < 400; i++) {
unique_ptr<KuduInsert> insert(table->NewInsert());
ASSERT_OK(insert->mutable_row()->SetInt64("col", i));
ASSERT_OK(session->Apply(insert.release()));
}
ASSERT_OK(session->Flush());
{ // no predicates
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
ASSERT_OK(KuduScanTokenBuilder(table.get()).Build(&tokens));
ASSERT_EQ(6, tokens.size());
ASSERT_EQ(300, CountRows(tokens));
NO_FATALS(VerifyTabletInfo(tokens));
}
{ // range predicate
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(table.get());
unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate("col",
KuduPredicate::GREATER_EQUAL,
KuduValue::FromInt(200)));
ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
ASSERT_OK(builder.Build(&tokens));
ASSERT_EQ(4, tokens.size());
ASSERT_EQ(200, CountRows(tokens));
NO_FATALS(VerifyTabletInfo(tokens));
}
{ // equality predicate
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(table.get());
unique_ptr<KuduPredicate> predicate(table->NewComparisonPredicate("col",
KuduPredicate::EQUAL,
KuduValue::FromInt(42)));
ASSERT_OK(builder.AddConjunctPredicate(predicate.release()));
ASSERT_OK(builder.Build(&tokens));
ASSERT_EQ(1, tokens.size());
ASSERT_EQ(1, CountRows(tokens));
NO_FATALS(VerifyTabletInfo(tokens));
}
{ // primary key bound
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(table.get());
unique_ptr<KuduPartialRow> upper_bound(schema.NewRow());
ASSERT_OK(upper_bound->SetInt64("col", 40));
ASSERT_OK(builder.AddUpperBound(*upper_bound));
ASSERT_OK(builder.Build(&tokens));
ASSERT_EQ(2, tokens.size());
ASSERT_EQ(40, CountRows(tokens));
NO_FATALS(VerifyTabletInfo(tokens));
}
}
} // namespace client
} // namespace kudu