blob: f67e05a341dbf9b47b46eb265ee9bebbb3f8a540 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <functional>
#include <initializer_list>
#include <memory>
#include <numeric>
#include <ostream>
#include <string>
#include <thread>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/write_op.h"
#include "kudu/common/partial_row.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/mini-cluster/internal_mini_cluster.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/util/hdr_histogram.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/spinlock_profiling.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DEFINE_int32(num_inserter_threads, 32,
"number of threads performing concurrent inserts into the tablets");
DEFINE_int32(runtime_sec, 5, "how long to run test scenarios (in seconds)");
DECLARE_bool(log_inject_latency);
DECLARE_int32(group_commit_queue_size_bytes);
DECLARE_int32(log_inject_latency_ms_mean);
DECLARE_int32(log_inject_latency_ms_stddev);
DECLARE_int32(log_segment_size_mb);
DECLARE_int32(max_num_columns);
DECLARE_int32(rpc_num_service_threads);
DECLARE_int32(rpc_service_queue_length);
DECLARE_int32(tablet_inject_latency_on_apply_write_op_ms);
DECLARE_uint32(tablet_apply_pool_overload_threshold_ms);
METRIC_DECLARE_counter(op_apply_queue_overload_rejections);
METRIC_DECLARE_counter(rpcs_queue_overflow);
METRIC_DECLARE_counter(rpcs_timed_out_in_queue);
METRIC_DECLARE_gauge_uint64(spinlock_contention_time);
METRIC_DECLARE_histogram(handler_latency_kudu_consensus_ConsensusService_UpdateConsensus);
METRIC_DECLARE_histogram(handler_latency_kudu_tserver_TabletServerService_Write);
METRIC_DECLARE_histogram(op_apply_queue_time);
METRIC_DECLARE_histogram(op_apply_run_time);
METRIC_DECLARE_histogram(reactor_active_latency_us);
METRIC_DECLARE_histogram(rpc_incoming_queue_time);
using kudu::KuduPartialRow;
using kudu::client::KuduClient;
using kudu::client::KuduError;
using kudu::client::KuduInsert;
using kudu::client::KuduSchema;
using kudu::client::KuduSchemaBuilder;
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
using kudu::cluster::InternalMiniCluster;
using kudu::cluster::InternalMiniClusterOptions;
using std::endl;
using std::ostringstream;
using std::string;
using std::thread;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace itest {
// The purpose of this test is to check for lock contention when running
// multiple write operations on the same tablet concurrently. Multiple threads
// are involved in the process of pushing Raft consensus updates corresponding
// to the incoming write requests, and the test pinpoints the contention among
// various threads involved.
class SameTabletConcurrentWritesBaseTest: public KuduTest {
public:
explicit SameTabletConcurrentWritesBaseTest(int num_columns)
: num_columns_(num_columns),
do_run_(true) {
}
void SetUp() override {
KuduTest::SetUp();
// Use schema with multiple string fields to make rows heavier.
KuduSchemaBuilder schema_builder;
schema_builder.AddColumn("key")->Type(client::KuduColumnSchema::INT64)->
NotNull()->PrimaryKey();
for (auto i = 1; i < num_columns_; ++i) {
schema_builder.AddColumn(Substitute("col$0", i))->
Type(client::KuduColumnSchema::STRING)->NotNull();
}
ASSERT_OK(schema_builder.Build(&schema_));
}
Status Prepare(int num_tablet_servers) {
InternalMiniClusterOptions opts;
opts.num_tablet_servers = num_tablet_servers;
cluster_.reset(new InternalMiniCluster(env_, opts));
RETURN_NOT_OK(cluster_->Start());
return CreateTestTable(num_tablet_servers);
}
void Run(size_t num_inserter_threads,
const MonoDelta& runtime,
vector<thread>* threads,
vector<size_t>* counters) {
ASSERT_TRUE(threads->empty());
threads->reserve(num_inserter_threads);
ASSERT_EQ(num_inserter_threads, counters->size());
vector<Status> statuses(num_inserter_threads);
for (auto idx = 0; idx < num_inserter_threads; ++idx) {
threads->emplace_back(&SameTabletConcurrentWritesBaseTest::InserterTask,
this,
idx,
num_inserter_threads,
&(statuses[idx]),
&((*counters)[idx]));
}
SleepFor(runtime);
do_run_ = false;
for_each(threads->begin(), threads->end(), [](thread& t) { t.join(); });
for (const auto& s : statuses) {
EXPECT_OK(s);
}
}
Status CreateTestTable(int num_replicas) {
client::sp::shared_ptr<KuduClient> client;
RETURN_NOT_OK(cluster_->CreateClient(nullptr, &client));
unique_ptr<KuduTableCreator> table_creator(client->NewTableCreator());
return table_creator->table_name(kTableName)
.schema(&schema_)
.num_replicas(num_replicas)
.add_hash_partitions({ "key" }, 2)
.Create();
}
void InserterTask(size_t task_idx,
int64_t key_increment,
Status* result_status,
size_t* counter) {
using client::sp::shared_ptr;
static constexpr const char kValPattern[] =
"$0.00000000000000000000000000000000000000000000000000000000000000000"
"$0.9876543210123456789"
"$0.11111111111111111111111111111111111111111111111111111111111111111"
"$0.0123456789abcdef.$0.abcdef0123456789."
"$0.22222222222222222222222222222222222222222222222222222222222222222"
"$0.`1234567890-=qwertyuioop[]asdfghjkl;'zxcvbnm,./."
"$0.33333333333333333333333333333333333333333333333333333333333333333"
"$0.9876543210123456789"
"$0.44444444444444444444444444444444444444444444444444444444444444444"
"$0.9876543210123456789"
"$0.55555555555555555555555555555555555555555555555555555555555555555"
"$0.9876543210123456789"
"$0.66666666666666666666666666666666666666666666666666666666666666666"
"$0.9876543210123456789"
"$0.77777777777777777777777777777777777777777777777777777777777777777"
"$0.9876543210123456789"
"$0.88888888888888888888888888888888888888888888888888888888888888888"
"$0.9876543210123456789"
"$0.99999999999999999999999999999999999999999999999999999999999999999";
#define RET_IF_NOT_OK(_s, _session) \
do { \
const Status s = (_s); \
Status status = s; \
if (!s.ok()) { \
const shared_ptr<KuduSession>& ses((_session)); \
if (ses) { \
vector<KuduError*> errors; \
ElementDeleter deleter(&errors); \
ses->GetPendingErrors(&errors, nullptr); \
if (!errors.empty()) { \
for (const auto* err : errors) { \
status = status.CloneAndAppend(err->status().message()); \
} \
} \
} \
*result_status = status; \
return; \
} \
} while (false)
shared_ptr<KuduClient> client;
shared_ptr<KuduSession> session;
RET_IF_NOT_OK(cluster_->CreateClient(nullptr, &client), session);
session = client->NewSession();
// Flush upon applying a row: this is to flood tablet servers with
// requests.
RET_IF_NOT_OK(session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC), session);
session->SetTimeoutMillis(15 * 1000);
shared_ptr<KuduTable> table;
RET_IF_NOT_OK(client->OpenTable(kTableName, &table), session);
int64_t i = task_idx;
while (do_run_) {
unique_ptr<KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
RET_IF_NOT_OK(row->SetInt64(0, i), session);
i += key_increment;
for (auto idx = 1; idx < num_columns_; ++idx) {
RET_IF_NOT_OK(row->SetString(idx, Substitute(kValPattern, i)), session);
}
RET_IF_NOT_OK(session->Apply(insert.release()), session);
++(*counter);
}
*result_status = Status::OK();
}
protected:
static constexpr const char* const kTableName = "test";
const int num_columns_;
InternalMiniClusterOptions opts_;
std::unique_ptr<InternalMiniCluster> cluster_;
KuduSchema schema_;
std::atomic<bool> do_run_;
};
class SameTabletConcurrentWritesTest: public SameTabletConcurrentWritesBaseTest {
public:
SameTabletConcurrentWritesTest()
: SameTabletConcurrentWritesBaseTest(250/*num_columns*/) {
}
};
// Run many inserters into the same tablet when WAL sync calls are slow due to
// injected latency; report on metrics like spinlock contention cycles and
// the number of RPC queue's overflows.
TEST_F(SameTabletConcurrentWritesTest, InsertsOnly) {
SKIP_IF_SLOW_NOT_ALLOWED();
constexpr int kNumTabletServers = 3;
// Custom settings for kudu-tserver's flags.
//
// Inject latency into WAL sync operations. This is to allow the point of
// lock contention in RaftConsensus::UpdateReplica() to manifest itself.
FLAGS_log_inject_latency = true;
FLAGS_log_inject_latency_ms_mean = 200;
FLAGS_log_inject_latency_ms_stddev = 0;
// Another lock contention manifests itself when RPC service threads are
// trying to obtain a lock already taken by another thread, where the latter
// thread is waiting for available space in the WAL's entry batch queue.
// Instead of sending huge amount of data to fill it in, let's reduce the
// capacity of the entry batch queue to make the lock contention manifesting
// itself in this test scenario.
FLAGS_group_commit_queue_size_bytes = 64;
// Keep log segments small to hit the injected latency as often as possible.
FLAGS_log_segment_size_mb = 1;
// To make the lock contention manifest itself at the side where RPCs are
// arriving, let's decrease the number of RPC workers and make the RPC
// queue shorter. An alternative approach might be creating many more
// concurrent writer threads and write much more data.
FLAGS_rpc_num_service_threads = 2;
FLAGS_rpc_service_queue_length = 3;
ASSERT_OK(Prepare(kNumTabletServers));
const auto runtime = MonoDelta::FromSeconds(FLAGS_runtime_sec);
const size_t num_inserter_threads = FLAGS_num_inserter_threads;
vector<thread> threads;
threads.reserve(num_inserter_threads);
vector<size_t> counters(num_inserter_threads, 0);
NO_FATALS(Run(num_inserter_threads, runtime, &threads, &counters));
int64_t num_queue_overflows = 0;
for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
const auto& ent = cluster_->mini_tablet_server(i)->server()->metric_entity();
// spinlock_contention_time is the same for each tablet server because
// this test scenario uses all-in-one-process internal mini-cluster.
auto gauge = METRIC_spinlock_contention_time.InstantiateFunctionGauge(
ent, []() { return GetSpinLockContentionMicros(); });
LOG(INFO) << "spinlock_contention_time for tserver " << i
<< ": " << gauge->value();
for (auto* elem : {
&METRIC_rpcs_queue_overflow,
&METRIC_rpcs_timed_out_in_queue,
}) {
auto counter = elem->Instantiate(ent);
const char* const name = counter->prototype()->name();
if (strcmp("rpcs_queue_overflow", name) == 0) {
num_queue_overflows += counter->value();
}
LOG(INFO) << "Counter value for tserver " << i
<< " on " << name << ": " << counter->value();
}
for (auto* elem : {
&METRIC_handler_latency_kudu_consensus_ConsensusService_UpdateConsensus,
&METRIC_handler_latency_kudu_tserver_TabletServerService_Write,
&METRIC_op_apply_queue_time,
&METRIC_op_apply_run_time,
&METRIC_reactor_active_latency_us,
&METRIC_rpc_incoming_queue_time,
}) {
auto hist = elem->Instantiate(ent);
ostringstream ostr;
ostr << "Stats for tserver " << i << " on " << elem->name() << ":" << endl;
hist->histogram()->DumpHumanReadable(&ostr);
LOG(INFO) << ostr.str();
}
}
const double total = accumulate(counters.begin(), counters.end(), 0UL);
LOG(INFO) << Substitute(
"write RPC request rate: $0 req/sec",
total / runtime.ToSeconds());
LOG(INFO) << Substitute(
"total count of RPC queue overflows: $0", num_queue_overflows);
}
class OpApplyQueueOverloadedTest: public SameTabletConcurrentWritesBaseTest {
public:
OpApplyQueueOverloadedTest()
: SameTabletConcurrentWritesBaseTest(1/*num_columns*/) {
}
};
// The essence of this test scenario is to make sure that a tablet server
// responds with appropriate error status if rejecting a write operation when
// its op apply queue is overloaded. A client should automatically retry
// the rejected operations and eventually succeed with its workload (of course,
// the latter depends on the configured session timeout).
//
// This scenario injects a delay into the apply phase of every write operation
// and runs many clients which insert data into the same tablet. The overload
// threshold for the apply queue is set very close to the injected delay,
// so many write operations are rejected due to the op apply queue being
// overloaded. Nevertheless, every client eventually succeeds with its workload
// because the rejected write operations are automatically retried.
TEST_F(OpApplyQueueOverloadedTest, ClientRetriesOperations) {
SKIP_IF_SLOW_NOT_ALLOWED();
constexpr int kNumTabletServers = 3;
// A couple of settings below might be overriden from the command line by
// specifying corresponding flags. This scenarios set reasonable defaults.
// The op apply threadpool's max_threads is set to base::NumCPUs(), but we
// want to have much more inserters to induce the overload of the op apply
// queue faster.
const int num_cpus = 3 * base::NumCPUs();
ASSERT_NE("", SetCommandLineOptionWithMode("num_inserter_threads",
std::to_string(num_cpus).c_str(),
google::SET_FLAG_IF_DEFAULT));
// The workload should run for long enough time to allow the apply queue
// detecting the overload condition. Once the queue overload condition is
// detected, tablet server starts rejecting incoming write requests.
ASSERT_NE("", SetCommandLineOptionWithMode("runtime_sec",
"3",
google::SET_FLAG_IF_DEFAULT));
// Custom settings for kudu-tserver's flags.
//
// Inject latency into the op 'apply' phase and make the apply queue's
// overload threshold to the same value. This is to make tablet servers
// rejecting many write operations due to overload of the op apply queue.
FLAGS_tablet_apply_pool_overload_threshold_ms = 100;
FLAGS_tablet_inject_latency_on_apply_write_op_ms = 100;
// Also, set the RPC queue length very high to avoid queue overflows. The
// idea is to put as much pressure pressure on the apply queue as possible.
FLAGS_rpc_service_queue_length = 1000;
// Custom settings for kudu-master's flags.
//
// Set the RPC queue length very high to avoid queue overflows: the number
// of clients working concurrently might be too high for the default settings
// of the master's RPC queue length.
FLAGS_rpc_service_queue_length = 1000;
ASSERT_OK(Prepare(kNumTabletServers));
const auto runtime = MonoDelta::FromSeconds(FLAGS_runtime_sec);
const size_t num_inserter_threads = FLAGS_num_inserter_threads;
vector<thread> threads;
threads.reserve(num_inserter_threads);
vector<size_t> counters(num_inserter_threads, 0);
NO_FATALS(Run(num_inserter_threads, runtime, &threads, &counters));
int64_t num_rejections = 0;
for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
const auto& ent = cluster_->mini_tablet_server(i)->server()->metric_entity();
for (auto* elem : {
&METRIC_op_apply_queue_overload_rejections,
}) {
auto counter = elem->Instantiate(ent);
const char* const name = counter->prototype()->name();
LOG(INFO) << "Counter value for tserver " << i
<< " on " << name << ": " << counter->value();
num_rejections += counter->value();
}
for (auto* elem : {
&METRIC_op_apply_queue_time,
&METRIC_op_apply_run_time,
}) {
auto hist = elem->Instantiate(ent);
ostringstream ostr;
ostr << "Stats for tserver " << i << " on " << elem->name() << ":" << endl;
hist->histogram()->DumpHumanReadable(&ostr);
LOG(INFO) << ostr.str();
}
}
// Just for information, print out the resulting request rate.
const double total = accumulate(counters.begin(), counters.end(), 0UL);
LOG(INFO) << Substitute(
"write RPC request rate: $0 req/sec",
total / runtime.ToSeconds());
// At least few write operations should be rejected due to overloaded apply
// queue.
ASSERT_GT(num_rejections, 0);
}
} // namespace itest
} // namespace kudu