blob: 57ef17537ebb6fd42be30373a7cdffdf40287942 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <thread>
#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include "kudu/client/client.h"
#include "kudu/client/schema.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/txn_id.h"
#include "kudu/gutil/macros.h"
#include "kudu/util/atomic.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/random.h"
#include "kudu/util/status.h"
namespace kudu {
namespace cluster {
class MiniCluster;
} // namespace cluster
// Utility class for generating a workload against a test cluster.
// The actual data inserted is random, and thus can't be verified for
// integrity. However, this is still useful in conjunction with ClusterVerifier
// to verify that replicas do not diverge.
// The read workload essentially tests read-your-writes. It constantly
// issues snapshot scans in the present and asserts that we see at least as
// many rows as we have written, independently of which replica we choose
// to scan.
class TestWorkload {
static const char* const kDefaultTableName;
// Kudu provides two types of partitioning for a table: range partitioning and
// hash partitioning. This enum contains corresponding elements to be used
// to define partitioning type for the test table. Multi-level partitioning
// for the test table automatically created by TestWorkload isn't supported
// yet: as a workaround, point TestWorkload to already existing table
// with desired partitioning type.
enum class PartitioningType {
// Create an test workload object to run against the specified cluster
// using a test table of the specified partitioning type.
explicit TestWorkload(
cluster::MiniCluster* cluster,
PartitioningType partitioning = PartitioningType::RANGE);
// Ingest the workload as part of the given transaction. set_begin_txn() must
// not be called if this is set.
void set_txn_id(int64_t txn_id) {
txn_id_ = TxnId(txn_id);
// Ingest the workload as a part of a new transaction. set_txn_id() must not
// be called if this is set.
void set_begin_txn() {
begin_txn_ = true;
// Commit the transaction that this workload is a part of upon calling
// StopAndJoin(). If set, either set_begin_txn() or set_txn_id() must be set
// as well. If not set, but either set_begin_txn() or set_txn_id() is set,
// the workload will ingest as a part of the transaction, but not call
// commit on completion.
// set_rollback_txn() must not be called if this is set.
void set_commit_txn() {
commit_txn_ = true;
// Abort the transaction that this workload is a part of upon calling
// StopAndJoin(). If set, either set_begin_txn() or set_txn_id() must be set
// as well. If not set, but either set_begin_txn() or set_txn_id() is set,
// the workload will ingest as a part of the transaction, but not call abort
// on completion.
// set_commit_txn() must not be called if this is set.
void set_rollback_txn() {
rollback_txn_ = true;
// Sets whether the read thread should crash if scanning to the cluster fails
// for whatever reason. If set to true, errors will be populated in
// 'read_errors_'.
void set_read_errors_allowed(bool allowed) {
read_errors_allowed_ = allowed;
void set_scanner_fault_tolerant(bool fault_tolerant) {
fault_tolerant_ = fault_tolerant;
void set_scanner_selection(client::KuduClient::ReplicaSelection selection) {
selection_ = selection;
void set_payload_bytes(int n) {
payload_bytes_ = n;
void set_num_write_threads(int n) {
num_write_threads_ = n;
void set_num_read_threads(int n) {
num_read_threads_ = n;
void set_write_batch_size(int s) {
write_batch_size_ = s;
void set_write_interval_millis(int t) {
write_interval_millis_ = t;
void set_client_default_rpc_timeout_millis(int t) {
void set_client_default_admin_operation_timeout_millis(int t) {
void set_read_timeout_millis(int t) {
read_timeout_millis_ = t;
void set_write_timeout_millis(int t) {
write_timeout_millis_ = t;
// Set whether to fail if we see a TimedOut() error inserting a row.
// By default, this triggers a CHECK failure.
void set_timeout_allowed(bool allowed) {
timeout_allowed_ = allowed;
void set_network_error_allowed(bool allowed) {
network_error_allowed_ = allowed;
void set_remote_error_allowed(bool allowed) {
remote_error_allowed_ = allowed;
// Set whether to fail if we see a NotFound() error inserting a row.
// This sort of error is triggered if the table is deleted while the workload
// is running.
// By default, this triggers a CHECK failure.
void set_not_found_allowed(bool allowed) {
not_found_allowed_ = allowed;
// Set whether we should attempt to verify the number of rows when scanning.
// An incorrect number of rows may be indicative of a stale read.
// If either set_begin_txn() or set_txn_id() has been called, does not verify
// the number of rows.
void set_verify_num_rows(bool should_verify) {
verify_num_rows_ = should_verify;
// Whether per-row errors with Status::AlreadyPresent() are allowed.
// By default this triggers a check failure.
void set_already_present_allowed(bool allowed) {
already_present_allowed_ = allowed;
// Override the default "simple" schema.
void set_schema(const client::KuduSchema& schema);
void set_num_replicas(int r) {
num_replicas_ = r;
// Set the number of tablets for the test table created by this workload.
// In case of range partitioning, the split points are evenly distributed
// through positive int32s.
void set_num_tablets(int num_tablets) {
if (partitioning_ == PartitioningType::RANGE) {
CHECK_GE(num_tablets, 1);
} else {
CHECK_GE(num_tablets, 2);
num_tablets_ = num_tablets;
void set_table_name(const std::string& table_name) {
table_name_ = table_name;
const std::string& table_name() const {
return table_name_;
static const int kNumRowsForDuplicateKeyWorkload = 20;
enum WritePattern {
// The default: insert random row keys. This may cause an occasional
// duplicate, but with 32-bit keys, they won't be frequent.
// Insert random rows, then delete them.
// This may cause an occasional duplicate, but with 32-bit keys, they won't be frequent.
// This requires two flush operations.
// All threads generate updates against a single row.
// Insert rows in random order, but restricted to only
// kNumRowsForDuplicateKeyWorkload unique keys. This ensures that,
// after a very short initial warm-up period, all inserts fail with
// duplicate keys.
// Insert sequential rows.
// This causes flushes but no compactions.
// Insert sequential rows, then delete them.
void set_write_pattern(WritePattern pattern) {
write_pattern_ = pattern;
switch (pattern) {
default: LOG(FATAL) << "Unsupported WritePattern.";
client::sp::shared_ptr<client::KuduClient> CreateClient();
// Sets up the internal client and creates the table which will be used for
// writing, if it doesn't already exist.
void Setup();
// Start the write workload.
void Start();
// Stop the writers and wait for them to exit.
void StopAndJoin();
// Delete created table, etc.
Status Cleanup();
int64_t txn_id() const {
return txn_id_.value();
// Return the number of rows inserted so far. This may be called either
// during or after the write workload. If writing as a part of a transaction,
// these rows may have not been committed.
int64_t rows_inserted() const {
return rows_inserted_.Load();
// Return the number of rows deleted so far. This may be called either
// during or after the write workload.
int64_t rows_deleted() const {
return rows_deleted_.Load();
// Return the number of batches in which we have successfully inserted at
// least one row.
// NOTE: it is not safe to assume that this is exactly equal to the number
// of log operations generated on the TS side. The client may split a single
// Flush() call into multiple batches.
int64_t batches_completed() const {
return batches_completed_.Load();
// Returns a copy of the errors seen by the read threads so far.
std::vector<Status> read_errors() const {
std::lock_guard<simple_spinlock> l(read_error_lock_);
return read_errors_;
client::sp::shared_ptr<client::KuduClient> client() const { return client_; }
void OpenTable(client::sp::shared_ptr<client::KuduTable>* table);
void WriteThread();
void ReadThread();
size_t GetNumberOfErrors(client::KuduSession* session);
cluster::MiniCluster* cluster_;
const PartitioningType partitioning_;
client::KuduClientBuilder client_builder_;
client::sp::shared_ptr<client::KuduClient> client_;
ThreadSafeRandom rng_;
boost::optional<int> payload_bytes_;
int num_write_threads_;
int num_read_threads_;
int read_timeout_millis_;
int write_batch_size_;
int write_interval_millis_;
int write_timeout_millis_;
TxnId txn_id_;
bool begin_txn_;
bool commit_txn_;
bool rollback_txn_;
bool fault_tolerant_;
bool verify_num_rows_;
bool read_errors_allowed_;
bool timeout_allowed_;
bool not_found_allowed_;
bool already_present_allowed_;
bool network_error_allowed_;
bool remote_error_allowed_;
WritePattern write_pattern_;
client::KuduClient::ReplicaSelection selection_;
client::KuduSchema schema_;
int num_replicas_;
int num_tablets_;
std::string table_name_;
CountDownLatch start_latch_;
AtomicBool should_run_;
AtomicInt<int64_t> rows_inserted_;
AtomicInt<int64_t> rows_deleted_;
AtomicInt<int64_t> batches_completed_;
AtomicInt<int32_t> sequential_key_gen_;
client::sp::shared_ptr<client::KuduTransaction> txn_;
std::vector<std::thread> threads_;
mutable simple_spinlock read_error_lock_;
std::vector<Status> read_errors_;
} // namespace kudu