blob: 80f6cc8d7bd5659664b92d2461ba0cc111e34678 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/callbacks.h"
#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/gutil/port.h"
#include "kudu/master/mini_master.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/util/async_util.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/curl_util.h"
#include "kudu/util/faststring.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DECLARE_int32(flush_threshold_mb);
DECLARE_int32(log_segment_size_mb);
DECLARE_int32(maintenance_manager_polling_interval_ms);
DEFINE_int32(mbs_for_flushes_and_rolls, 1, "How many MBs are needed to flush and roll");
DEFINE_int32(row_count, 2000, "How many rows will be used in this test for the base data");
DEFINE_int32(seconds_to_run, 4,
"How long this test runs for, after inserting the base data, in seconds");
using kudu::client::KuduInsert;
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::KuduColumnSchema;
using kudu::client::KuduRowResult;
using kudu::client::KuduScanBatch;
using kudu::client::KuduScanner;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduStatusCallback;
using kudu::client::KuduStatusMemberCallback;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
using kudu::client::KuduUpdate;
using kudu::client::sp::shared_ptr;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace tablet {
// This integration test tries to trigger all the update-related bits while also serving as a
// foundation for benchmarking. It first inserts 'row_count' rows and then starts two threads,
// one that continuously updates all the rows sequentially and one that scans them all, until
// it's been running for 'seconds_to_run'. It doesn't test for correctness, unless something
// FATALs.
class UpdateScanDeltaCompactionTest : public KuduTest {
protected:
UpdateScanDeltaCompactionTest() {
KuduSchemaBuilder b;
b.AddColumn("key")->Type(KuduColumnSchema::INT64)->NotNull()->PrimaryKey();
b.AddColumn("string")->Type(KuduColumnSchema::STRING)->NotNull();
b.AddColumn("int64")->Type(KuduColumnSchema::INT64)->NotNull();
CHECK_OK(b.Build(&schema_));
}
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
}
void CreateTable() {
NO_FATALS(InitCluster());
unique_ptr<KuduTableCreator> table_creator(client_->NewTableCreator());
ASSERT_OK(table_creator->table_name(kTableName)
.schema(&schema_)
.set_range_partition_columns({ "key" })
.num_replicas(1)
.Create());
ASSERT_OK(client_->OpenTable(kTableName, &table_));
}
virtual void TearDown() OVERRIDE {
if (cluster_) {
cluster_->Shutdown();
}
KuduTest::TearDown();
}
// Inserts row_count rows sequentially.
void InsertBaseData();
// Starts the update and scan threads then stops them after seconds_to_run.
void RunThreads();
private:
enum {
kKeyCol,
kStrCol,
kInt64Col
};
static const char* const kTableName;
void InitCluster() {
// Start mini-cluster with 1 tserver.
cluster_.reset(new InternalMiniCluster(env_, InternalMiniClusterOptions()));
ASSERT_OK(cluster_->Start());
KuduClientBuilder client_builder;
client_builder.add_master_server_addr(
cluster_->mini_master()->bound_rpc_addr_str());
ASSERT_OK(client_builder.Build(&client_));
}
shared_ptr<KuduSession> CreateSession() {
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(30000);
CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
return session;
}
// Continuously updates the existing data until 'stop_latch' drops to 0.
void UpdateRows(CountDownLatch* stop_latch);
// Continuously scans the data until 'stop_latch' drops to 0.
void ScanRows(CountDownLatch* stop_latch) const;
// Continuously fetch various web pages on the TS.
void CurlWebPages(CountDownLatch* stop_latch) const;
// Sets the passed values on the row.
// TODO randomize the string column.
void MakeRow(int64_t key, int64_t val, KuduPartialRow* row) const;
// If 'key' is a multiple of kSessionBatchSize, it uses 'last_s' to wait for the previous batch
// to finish and then flushes the current one.
Status WaitForLastBatchAndFlush(int64_t key,
Synchronizer* last_s,
KuduStatusCallback* last_s_cb,
shared_ptr<KuduSession> session);
KuduSchema schema_;
std::shared_ptr<InternalMiniCluster> cluster_;
shared_ptr<KuduTable> table_;
shared_ptr<KuduClient> client_;
};
const char* const UpdateScanDeltaCompactionTest::kTableName = "update-scan-delta-compact-tbl";
const int kSessionBatchSize = 1000;
TEST_F(UpdateScanDeltaCompactionTest, TestAll) {
OverrideFlagForSlowTests("seconds_to_run", "100");
OverrideFlagForSlowTests("row_count", "1000000");
OverrideFlagForSlowTests("mbs_for_flushes_and_rolls", "8");
// Setting this high enough that we see the effects of flushes and compactions.
OverrideFlagForSlowTests("maintenance_manager_polling_interval_ms", "2000");
FLAGS_flush_threshold_mb = FLAGS_mbs_for_flushes_and_rolls;
FLAGS_log_segment_size_mb = FLAGS_mbs_for_flushes_and_rolls;
if (!AllowSlowTests()) {
// Make it run more often since it's not a long test.
FLAGS_maintenance_manager_polling_interval_ms = 50;
}
NO_FATALS(CreateTable());
NO_FATALS(InsertBaseData());
NO_FATALS(RunThreads());
}
void UpdateScanDeltaCompactionTest::InsertBaseData() {
shared_ptr<KuduSession> session = CreateSession();
Synchronizer last_s;
KuduStatusMemberCallback<Synchronizer> last_s_cb(&last_s,
&Synchronizer::StatusCB);
last_s_cb.Run(Status::OK());
LOG_TIMING(INFO, "Insert") {
for (int64_t key = 0; key < FLAGS_row_count; key++) {
unique_ptr<KuduInsert> insert(table_->NewInsert());
MakeRow(key, 0, insert->mutable_row());
ASSERT_OK(session->Apply(insert.release()));
ASSERT_OK(WaitForLastBatchAndFlush(key, &last_s, &last_s_cb, session));
}
ASSERT_OK(WaitForLastBatchAndFlush(kSessionBatchSize, &last_s, &last_s_cb, session));
ASSERT_OK(last_s.Wait());
}
}
void UpdateScanDeltaCompactionTest::RunThreads() {
vector<thread> threads;
CountDownLatch stop_latch(1);
threads.emplace_back([this, &stop_latch]() { this->UpdateRows(&stop_latch); });
threads.emplace_back([this, &stop_latch]() { this->ScanRows(&stop_latch); });
threads.emplace_back([this, &stop_latch]() { this->CurlWebPages(&stop_latch); });
SleepFor(MonoDelta::FromSeconds(FLAGS_seconds_to_run * 1.0));
stop_latch.CountDown();
for (auto& t : threads) {
t.join();
}
}
void UpdateScanDeltaCompactionTest::UpdateRows(CountDownLatch* stop_latch) {
shared_ptr<KuduSession> session = CreateSession();
Synchronizer last_s;
KuduStatusMemberCallback<Synchronizer> last_s_cb(&last_s,
&Synchronizer::StatusCB);
for (int64_t iteration = 1; stop_latch->count() > 0; iteration++) {
last_s_cb.Run(Status::OK());
LOG_TIMING(INFO, "Update") {
for (int64_t key = 0; key < FLAGS_row_count && stop_latch->count() > 0; key++) {
unique_ptr<KuduUpdate> update(table_->NewUpdate());
MakeRow(key, iteration, update->mutable_row());
CHECK_OK(session->Apply(update.release()));
CHECK_OK(WaitForLastBatchAndFlush(key, &last_s, &last_s_cb, session));
}
CHECK_OK(WaitForLastBatchAndFlush(kSessionBatchSize, &last_s, &last_s_cb, session));
CHECK_OK(last_s.Wait());
}
}
}
void UpdateScanDeltaCompactionTest::ScanRows(CountDownLatch* stop_latch) const {
while (stop_latch->count() > 0) {
KuduScanner scanner(table_.get());
// Sometimes use fault-tolerant scans, to get more coverage of the
// MergeIterator code paths.
if (rand() % 2 == 1) {
CHECK_OK(scanner.SetFaultTolerant());
}
LOG_TIMING(INFO, "Scan") {
CHECK_OK(scanner.Open());
KuduScanBatch batch;
while (scanner.HasMoreRows()) {
CHECK_OK(scanner.NextBatch(&batch));
}
}
}
}
void UpdateScanDeltaCompactionTest::CurlWebPages(CountDownLatch* stop_latch) const {
vector<string> urls;
string base_url = cluster_->mini_tablet_server(0)->bound_http_addr().ToString();
urls.push_back(base_url + "/scans");
urls.push_back(base_url + "/transactions");
EasyCurl curl;
faststring dst;
while (stop_latch->count() > 0) {
for (const string& url : urls) {
VLOG(1) << "Curling URL " << url;
Status status = curl.FetchURL(url, &dst);
if (status.ok()) {
CHECK_GT(dst.length(), 0);
}
}
}
}
void UpdateScanDeltaCompactionTest::MakeRow(int64_t key,
int64_t val,
KuduPartialRow* row) const {
CHECK_OK(row->SetInt64(kKeyCol, key));
CHECK_OK(row->SetStringCopy(kStrCol, "TODO random string"));
CHECK_OK(row->SetInt64(kInt64Col, val));
}
Status UpdateScanDeltaCompactionTest::WaitForLastBatchAndFlush(int64_t key,
Synchronizer* last_s,
KuduStatusCallback* last_s_cb,
shared_ptr<KuduSession> session) {
if (key % kSessionBatchSize == 0) {
RETURN_NOT_OK(last_s->Wait());
last_s->Reset();
session->FlushAsync(last_s_cb);
}
return Status::OK();
}
} // namespace tablet
} // namespace kudu