blob: 05f29b3e05e0cb10a43fe8b47362c157d3ae5ebf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <thread>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/codegen/compilation_manager.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-harness.h"
#include "kudu/tablet/tablet-test-base.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/faststring.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_graph.h"
#include "kudu/util/test_util.h"
DECLARE_int32(tablet_history_max_age_sec);
DECLARE_double(tablet_delta_store_major_compact_min_ratio);
DECLARE_int32(tablet_delta_store_minor_compact_max);
DEFINE_int32(num_insert_threads, 8, "Number of inserting threads to launch");
DEFINE_int32(num_counter_threads, 8, "Number of counting threads to launch");
DEFINE_int32(num_summer_threads, 1, "Number of summing threads to launch");
DEFINE_int32(num_slowreader_threads, 1, "Number of 'slow' reader threads to launch");
DEFINE_int32(num_flush_threads, 1, "Number of flusher reader threads to launch");
DEFINE_int32(num_compact_threads, 1, "Number of compactor threads to launch");
DEFINE_int32(num_undo_delta_gc_threads, 1, "Number of undo delta gc threads to launch");
DEFINE_int32(num_deleted_rowset_gc_threads, 1, "Number of deleted rowset gc threads to launch");
DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
DEFINE_int32(num_flush_delta_threads, 1, "Number of delta flusher reader threads to launch");
DEFINE_int32(num_minor_compact_deltas_threads, 1,
"Number of delta minor compactor threads to launch");
DEFINE_int32(num_major_compact_deltas_threads, 1,
"Number of delta major compactor threads to launch");
DEFINE_int64(inserts_per_thread, 1000,
"Number of rows inserted by each inserter thread");
DEFINE_int32(tablet_test_flush_threshold_mb, 0, "Minimum memrowset size to flush");
DEFINE_double(flusher_backoff, 2.0f, "Ratio to backoff the flusher thread");
DEFINE_int32(flusher_initial_frequency_ms, 30, "Number of ms to wait between flushes");
using std::shared_ptr;
using std::thread;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace tablet {
namespace {
const MonoDelta kBackgroundOpInterval = MonoDelta::FromMilliseconds(100);
} // anonymous namespace
template<class SETUP>
class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
// Import some names from superclass, since C++ is stingy about
// letting us refer to the members otherwise.
typedef TabletTestBase<SETUP> superclass;
using superclass::schema_;
using superclass::client_schema_;
using superclass::tablet;
using superclass::setup_;
public:
virtual void SetUp() {
superclass::SetUp();
// Warm up code cache with all the projections we'll be using.
unique_ptr<RowwiseIterator> iter;
CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
uint64_t count;
CHECK_OK(tablet()->CountRows(&count));
const Schema* schema = tablet()->schema().get();
ColumnSchema valcol = schema->column(schema->find_column("val"));
valcol_projection_ = Schema({ valcol }, 0);
CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
codegen::CompilationManager::GetSingleton()->Wait();
ts_collector_.StartDumperThread();
}
explicit MultiThreadedTabletTest(TabletHarness::Options::ClockType clock_type =
TabletHarness::Options::ClockType::LOGICAL_CLOCK)
: TabletTestBase<SETUP>(clock_type),
running_insert_count_(FLAGS_num_insert_threads),
ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {}
void InsertThread(int tid) {
CountDownOnScopeExit dec_count(&running_insert_count_);
shared_ptr<TimeSeries> inserts = ts_collector_.GetTimeSeries("inserted");
// TODO: add a test where some of the inserts actually conflict
// on the same row.
uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * FLAGS_num_insert_threads)
/ FLAGS_num_insert_threads;
if (max_rows < FLAGS_inserts_per_thread) {
LOG(WARNING) << "Clamping the inserts per thread to " << max_rows << " to prevent overflow";
}
this->InsertTestRows(tid * max_rows,
max_rows, 0,
inserts.get());
}
void UpdateThread(int tid) {
const Schema &schema = schema_;
shared_ptr<TimeSeries> updates = ts_collector_.GetTimeSeries("updated");
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
RowBlockMemory mem(1024);
RowBlock block(&schema_, 1, &mem);
faststring update_buf;
uint64_t updates_since_last_report = 0;
int col_idx = schema.num_key_columns() == 1 ? 2 : 3;
LOG(INFO) << "Update thread using schema: " << schema.ToString();
KuduPartialRow row(&client_schema_);
while (running_insert_count_.count() > 0) {
unique_ptr<RowwiseIterator> iter;
CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
CHECK_OK(iter->Init(nullptr));
while (iter->HasNext() && running_insert_count_.count() > 0) {
mem.Reset();
CHECK_OK(iter->NextBlock(&block));
CHECK_EQ(block.nrows(), 1);
if (!block.selection_vector()->IsRowSelected(0)) {
// Don't try to update rows which aren't visible yet --
// this will crash, since the data in row_slice isn't even copied.
continue;
}
RowBlockRow rb_row = block.row(0);
if (rand() % 10 == 7) {
// Increment the "val"
const int32_t *old_val = schema.ExtractColumnFromRow<INT32>(rb_row, col_idx);
// Issue an update. In the NullableValue setup, many of the rows start with
// NULL here, so we have to check for it.
int32_t new_val;
if (old_val != nullptr) {
new_val = *old_val + 1;
} else {
new_val = 0;
}
// Rebuild the key by extracting the cells from the row
setup_.BuildRowKeyFromExistingRow(&row, rb_row);
CHECK_OK(row.SetInt32(col_idx, new_val));
CHECK_OK(writer.Update(row));
if (++updates_since_last_report >= 10) {
updates->AddValue(updates_since_last_report);
updates_since_last_report = 0;
}
}
}
}
}
// Thread which repeatedly issues CountRows() and makes sure
// that the count doesn't go ever down.
void CountThread(int tid) {
rowid_t last_count = 0;
while (running_insert_count_.count() > 0) {
uint64_t count;
CHECK_OK(tablet()->CountRows(&count));
ASSERT_GE(count, last_count);
last_count = count;
}
}
// Thread which iterates slowly over the first 10% of the data.
// This is meant to test that outstanding iterators don't end up
// trying to reference already-freed memrowset memory.
void SlowReaderThread(int /*tid*/) {
RowBlockMemory mem(32 * 1024);
RowBlock block(&schema_, 1, &mem);
uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * FLAGS_num_insert_threads)
/ FLAGS_num_insert_threads;
int max_iters = FLAGS_num_insert_threads * max_rows / 10;
while (running_insert_count_.count() > 0) {
unique_ptr<RowwiseIterator> iter;
CHECK_OK(tablet()->NewRowIterator(client_schema_, &iter));
CHECK_OK(iter->Init(nullptr));
for (int i = 0; i < max_iters && iter->HasNext(); i++) {
mem.Reset();
CHECK_OK(iter->NextBlock(&block));
if (running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(1))) {
return;
}
}
}
}
void SummerThread(int tid) {
shared_ptr<TimeSeries> scanned_ts = ts_collector_.GetTimeSeries(
"scanned");
while (running_insert_count_.count() > 0) {
CountSum(scanned_ts);
}
}
uint64_t CountSum(const shared_ptr<TimeSeries> &scanned_ts) {
RowBlockMemory mem(1024); // unused, just scanning ints
static const int kBufInts = 1024*1024 / 8;
RowBlock block(&valcol_projection_, kBufInts, &mem);
ColumnBlock column = block.column_block(0);
uint64_t count_since_report = 0;
int64_t sum = 0;
unique_ptr<RowwiseIterator> iter;
CHECK_OK(tablet()->NewRowIterator(valcol_projection_, &iter));
CHECK_OK(iter->Init(nullptr));
while (iter->HasNext()) {
mem.Reset();
CHECK_OK(iter->NextBlock(&block));
for (size_t j = 0; j < block.nrows(); j++) {
sum += *reinterpret_cast<const int32_t *>(column.cell_ptr(j));
}
count_since_report += block.nrows();
// Report metrics if enough time has passed
if (count_since_report > 100) {
if (scanned_ts.get()) {
scanned_ts->AddValue(count_since_report);
}
count_since_report = 0;
}
}
if (scanned_ts.get()) {
scanned_ts->AddValue(count_since_report);
}
return sum;
}
void FlushThread(int /*tid*/) {
// Start off with a very short wait time between flushes.
// But, especially in debug mode, this will only allow a few
// rows to get inserted between each flush, and the test will take
// quite a while. So, after every flush, we double the wait time below.
int wait_time = FLAGS_flusher_initial_frequency_ms;
while (running_insert_count_.count() > 0) {
if (tablet()->MemRowSetSize() > FLAGS_tablet_test_flush_threshold_mb * 1024 * 1024) {
CHECK_OK(tablet()->Flush());
} else {
LOG(INFO) << "Not flushing, memrowset not very full";
}
if (tablet()->DeltaMemStoresSize() > FLAGS_tablet_test_flush_threshold_mb * 1024 * 1024) {
CHECK_OK(tablet()->FlushBiggestDMS());
}
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
wait_time *= FLAGS_flusher_backoff;
}
}
void FlushDeltasThread(int /*tid*/) {
while (running_insert_count_.count() > 0) {
CHECK_OK(tablet()->FlushBiggestDMS());
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(kBackgroundOpInterval);
}
}
void MinorCompactDeltasThread(int /*tid*/) {
CompactDeltas(RowSet::MINOR_DELTA_COMPACTION);
}
void MajorCompactDeltasThread(int /*tid*/) {
CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION);
}
void CompactDeltas(RowSet::DeltaCompactionType type) {
while (running_insert_count_.count() > 0) {
VLOG(1) << "Compacting worst deltas";
CHECK_OK(tablet()->CompactWorstDeltas(type));
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(kBackgroundOpInterval);
}
}
void CompactThread(int /*tid*/) {
while (running_insert_count_.count() > 0) {
CHECK_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(kBackgroundOpInterval);
}
}
void DeleteAncientUndoDeltasThread(int /*tid*/) {
while (running_insert_count_.count() > 0) {
MonoDelta time_budget = kBackgroundOpInterval;
int64_t bytes_in_ancient_undos = 0;
CHECK_OK(tablet()->InitAncientUndoDeltas(time_budget, &bytes_in_ancient_undos));
VLOG(1) << "Found " << bytes_in_ancient_undos << " bytes of ancient delta undos";
int64_t blocks_deleted = 0;
int64_t bytes_deleted = 0;
CHECK_OK(tablet()->DeleteAncientUndoDeltas(&blocks_deleted, &bytes_deleted));
if (blocks_deleted > 0) {
LOG(INFO) << "Deleted " << blocks_deleted << " blocks (" << bytes_deleted << " bytes) "
<< "of ancient delta undos";
}
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(kBackgroundOpInterval);
}
}
// Thread that looks for rowsets that are ancient and fully deleted, GCing
// those that are.
void DeleteAncientDeletedRowsetsThreads(int /*tid*/) {
do {
int64_t bytes_in_ancient_deleted_rowsets = 0;
CHECK_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes_in_ancient_deleted_rowsets));
VLOG(1) << Substitute("Found $0 bytes in ancient, fully deleted rowsets",
bytes_in_ancient_deleted_rowsets);
if (bytes_in_ancient_deleted_rowsets > 0) {
CHECK_OK(tablet()->DeleteAncientDeletedRowsets());
LOG(INFO) << Substitute("Deleted $0 bytes found in ancient, fully deleted rowsets",
bytes_in_ancient_deleted_rowsets);
}
} while (!running_insert_count_.WaitFor(kBackgroundOpInterval));
}
// Thread which cycles between inserting and deleting a test row, each time
// with a different value.
void DeleteAndReinsertCycleThread(int tid) {
int32_t iteration = 0;
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
while (running_insert_count_.count() > 0) {
for (int i = 0; i < 100; i++) {
CHECK_OK(this->InsertTestRow(&writer, tid, iteration++));
CHECK_OK(this->DeleteTestRow(&writer, tid));
}
}
}
// Thread which continuously sends updates at the same row, ignoring any
// "not found" errors that might come back. This is used simultaneously with
// DeleteAndReinsertCycleThread to check for races where we might accidentally
// succeed in UPDATING a ghost row.
void StubbornlyUpdateSameRowThread(int tid) {
int32_t iteration = 0;
LocalTabletWriter writer(this->tablet().get(), &this->client_schema_);
while (running_insert_count_.count() > 0) {
for (int i = 0; i < 100; i++) {
Status s = this->UpdateTestRow(&writer, tid, iteration++);
if (!s.ok() && !s.IsNotFound()) {
// We expect "not found", but not any other errors.
CHECK_OK(s);
}
}
}
}
// Thread which wakes up periodically and collects metrics like memrowset
// size, etc. Eventually we should have a metrics system to collect things
// like this, but for now, this is what we've got.
void CollectStatisticsThread(int tid) {
shared_ptr<TimeSeries> num_rowsets_ts = ts_collector_.GetTimeSeries(
"num_rowsets");
shared_ptr<TimeSeries> memrowset_size_ts = ts_collector_.GetTimeSeries(
"memrowset_kb");
shared_ptr<TimeSeries> num_live_rows_ts = ts_collector_.GetTimeSeries(
"num_live_rows");
while (running_insert_count_.count() > 0) {
num_rowsets_ts->SetValue(tablet()->num_rowsets());
memrowset_size_ts->SetValue(tablet()->MemRowSetSize() / 1024.0);
uint64_t num_live_rows;
ignore_result(tablet()->CountLiveRows(&num_live_rows));
num_live_rows_ts->SetValue(num_live_rows);
// Wait, unless the inserters are all done.
running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(10));
}
}
void StartThreads(int n_threads, const std::function<void(int)>& function) {
for (int i = 0; i < n_threads; i++) {
threads_.emplace_back([=]() { function(i); });
}
}
void JoinThreads() {
for (auto& t : threads_) {
t.join();
}
}
vector<thread> threads_;
CountDownLatch running_insert_count_;
// Projection with only an int column.
// This is provided by both harnesses.
Schema valcol_projection_;
TimeSeriesCollector ts_collector_;
};
TYPED_TEST_SUITE(MultiThreadedTabletTest, TabletTestHelperTypes);
TYPED_TEST(MultiThreadedTabletTest, DoTestAllAtOnce) {
if (1000 == FLAGS_inserts_per_thread) {
if (AllowSlowTests()) {
FLAGS_inserts_per_thread = 50000;
}
}
// Spawn a bunch of threads, each of which will do updates.
this->StartThreads(1, [this](int i) { this->CollectStatisticsThread(i); });
this->StartThreads(FLAGS_num_insert_threads,
[this](int i) { this->InsertThread(i); });
this->StartThreads(FLAGS_num_counter_threads,
[this](int i) { this->CountThread(i); });
this->StartThreads(FLAGS_num_summer_threads,
[this](int i) { this->SummerThread(i); });
this->StartThreads(FLAGS_num_flush_threads,
[this](int i) { this->FlushThread(i); });
this->StartThreads(FLAGS_num_compact_threads,
[this](int i) { this->CompactThread(i); });
this->StartThreads(FLAGS_num_undo_delta_gc_threads,
[this](int i) { this->DeleteAncientUndoDeltasThread(i); });
this->StartThreads(FLAGS_num_flush_delta_threads,
[this](int i) { this->FlushDeltasThread(i); });
this->StartThreads(FLAGS_num_minor_compact_deltas_threads,
[this](int i) { this->MinorCompactDeltasThread(i); });
this->StartThreads(FLAGS_num_major_compact_deltas_threads,
[this](int i) { this->MajorCompactDeltasThread(i); });
this->StartThreads(FLAGS_num_slowreader_threads,
[this](int i) { this->SlowReaderThread(i); });
this->StartThreads(FLAGS_num_updater_threads,
[this](int i) { this->UpdateThread(i); });
this->JoinThreads();
LOG_TIMING(INFO, "Summing int32 column") {
uint64_t sum = this->CountSum(shared_ptr<TimeSeries>());
LOG(INFO) << "Sum = " << sum;
}
uint64_t max_rows = this->ClampRowCount(FLAGS_inserts_per_thread * FLAGS_num_insert_threads)
/ FLAGS_num_insert_threads;
this->VerifyTestRows(0, max_rows * FLAGS_num_insert_threads);
}
// Start up a bunch of threads which repeatedly insert and delete the same
// row, while flushing and compacting. This checks various concurrent handling
// of DELETE/REINSERT during flushes.
TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
google::FlagSaver saver;
FLAGS_flusher_backoff = 1.0F;
FLAGS_flusher_initial_frequency_ms = 1;
FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01F;
FLAGS_tablet_delta_store_minor_compact_max = 10;
this->StartThreads(1, [this](int i) { this->CollectStatisticsThread(i); });
this->StartThreads(FLAGS_num_flush_threads,
[this](int i) { this->FlushThread(i); });
this->StartThreads(FLAGS_num_compact_threads,
[this](int i) { this->CompactThread(i); });
this->StartThreads(FLAGS_num_undo_delta_gc_threads,
[this](int i) { this->DeleteAncientUndoDeltasThread(i); });
this->StartThreads(FLAGS_num_flush_delta_threads,
[this](int i) { this->FlushDeltasThread(i); });
this->StartThreads(FLAGS_num_minor_compact_deltas_threads,
[this](int i) { this->MinorCompactDeltasThread(i); });
this->StartThreads(FLAGS_num_major_compact_deltas_threads,
[this](int i) { this->MajorCompactDeltasThread(i); });
this->StartThreads(10,
[this](int i) { this->DeleteAndReinsertCycleThread(i); });
this->StartThreads(10,
[this](int i) { this->StubbornlyUpdateSameRowThread(i); });
// Run very quickly in dev builds, longer in slow builds.
float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
Stopwatch sw;
sw.start();
while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
!this->HasFatalFailure()) {
SleepFor(MonoDelta::FromMicroseconds(5000));
}
// This is sort of a hack -- the flusher thread stops when it sees this
// countdown latch go to 0.
this->running_insert_count_.Reset(0);
this->JoinThreads();
}
// For tests where we want to use the hybrid clock. The hybrid clock is
// required for tablet history gc.
template<class SETUP>
class MultiThreadedHybridClockTabletTest : public MultiThreadedTabletTest<SETUP> {
public:
MultiThreadedHybridClockTabletTest()
: MultiThreadedTabletTest<SETUP>(TabletHarness::Options::ClockType::HYBRID_CLOCK) {
}
};
TYPED_TEST_SUITE(MultiThreadedHybridClockTabletTest, TabletTestHelperTypes);
// Perform many updates and continuously flush and major compact deltas, as
// well as run undo delta gc.
TYPED_TEST(MultiThreadedHybridClockTabletTest, UpdateNoMergeCompaction) {
google::FlagSaver saver;
FLAGS_tablet_history_max_age_sec = 0; // GC data as aggressively as possible.
FLAGS_flusher_backoff = 1.0F;
FLAGS_flusher_initial_frequency_ms = 1;
FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01F;
FLAGS_tablet_delta_store_minor_compact_max = 10;
// Start up our background op threads, targeting the creation of delta files.
this->StartThreads(FLAGS_num_flush_threads,
[this](int i) { this->FlushThread(i); });
this->StartThreads(FLAGS_num_flush_delta_threads,
[this](int i) { this->FlushDeltasThread(i); });
this->StartThreads(FLAGS_num_major_compact_deltas_threads,
[this](int i) { this->MajorCompactDeltasThread(i); });
this->StartThreads(FLAGS_num_undo_delta_gc_threads,
[this](int i) { this->DeleteAncientUndoDeltasThread(i); });
this->StartThreads(FLAGS_num_deleted_rowset_gc_threads,
[this](int i) { this->DeleteAncientDeletedRowsetsThreads(i); });
// Start our workload threads, targeting the creation of deltas that we can
// eventually GC.
this->StartThreads(10,
[this](int i) { this->DeleteAndReinsertCycleThread(i); });
this->StartThreads(10,
[this](int i) { this->StubbornlyUpdateSameRowThread(i); });
// For good measure, we'll also start a thread that scans.
this->StartThreads(FLAGS_num_summer_threads,
[this](int i) { this->SummerThread(i); });
// Run very quickly in dev builds, longer in slow builds.
float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
Stopwatch sw;
sw.start();
while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
!this->HasFatalFailure()) {
SleepFor(MonoDelta::FromMilliseconds(5));
}
// This is sort of a hack -- the flusher thread stops when it sees this
// countdown latch go to 0.
this->running_insert_count_.Reset(0);
this->JoinThreads();
}
} // namespace tablet
} // namespace kudu