KUDU-3265 txn-related options to 'kudu perf loadgen'
This patch adds the following txn-related options to the
'kudu perf loadgen' tool:
(1) --txn_start
(2) --txn_commit
(3) --txn_rollback
With those, it's possible to insert rows into the table in the scope
of a multi-row transaction, and then (1) leave the transaction
as is, or (2) commit the transaction, or (3) rollback the transaction
explicitly.
Semantically, specifying --txn_commit or --txn_rollback implies setting
--txn_start as well.
This patch partially addresses KUDU-3265. A follow-up patch will add
functionality to write and read transactional tokens in serialized form,
so the newly introduced --txn_start option starts making more sense.
With that, it will be possible to start a new multi-row transaction
in one session of the kudu CLI tool, and then continue adding more rows
in another session of the kudu CLI tool, passing around transactional
token.
Change-Id: I73750af589d432c6ae973439264f55c08a327aa7
Reviewed-on: http://gerrit.cloudera.org:8080/17526
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <awong@cloudera.com>
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index f619251..528347f 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2607,6 +2607,166 @@
ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
}
+// Run the loadgen with txn-related options.
+TEST_F(ToolTest, LoadgenTxnBasics) {
+ {
+ ExternalMiniClusterOptions opts;
+ // Prefer lighter cluster to speed up testing.
+ opts.num_tablet_servers = 1;
+ // Since txn-related functionality is not enabled by default for a while,
+ // a couple of flags should be overriden to allow running tests scenarios
+ // in txn-enabled environment. Also, set the txn keepalive interval to a
+ // higher value: in the end of this scenario it's expected to have a couple
+ // of open transactions, but everything might run slower than expected with
+ // TSAN-enabled binaries run on inferior hardware or VMs.
+ opts.extra_master_flags.emplace_back("--txn_manager_enabled");
+ opts.extra_master_flags.emplace_back("--txn_manager_status_table_num_replicas=1");
+ opts.extra_tserver_flags.emplace_back("--enable_txn_system_client_init");
+ opts.extra_tserver_flags.emplace_back("--txn_keepalive_interval_ms=120000");
+ NO_FATALS(StartExternalMiniCluster(std::move(opts)));
+ }
+ const vector<string> base_args = {
+ "perf", "loadgen",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ // Let's have a control over the number of rows inserted.
+ "--num_threads=2",
+ "--num_rows_per_thread=111",
+ };
+
+ // '--txn_start' works as expected when running against txn-enabled cluster.
+ {
+ vector<string> args(base_args);
+ args.emplace_back("--txn_start");
+ ASSERT_OK(RunKuduTool(args));
+
+ // An extra run to check for the number of visible rows: there should be
+ // none since the transaction isn't committed.
+ args.emplace_back("--run_scan");
+ string out;
+ ASSERT_OK(RunKuduTool(args, &out));
+ ASSERT_STR_MATCHES(out, "expected rows: 0");
+ ASSERT_STR_MATCHES(out, "actual rows : 0");
+ }
+
+ // '--txn_start' and '--txn_commit' combination works as expected.
+ {
+ vector<string> args(base_args);
+ args.emplace_back("--txn_start");
+ args.emplace_back("--txn_commit");
+ ASSERT_OK(RunKuduTool(args));
+
+ // An extra run to check for the number of visible rows: all inserted rows
+ // should be visible now since the transaction is to be committed.
+ args.emplace_back("--run_scan");
+ string out;
+ ASSERT_OK(RunKuduTool(args, &out));
+ ASSERT_STR_MATCHES(out, "expected rows: 222");
+ ASSERT_STR_MATCHES(out, "actual rows : 222");
+ }
+
+ // '--txn_start' and '--txn_rollback' combination works as expected.
+ {
+ vector<string> args(base_args);
+ args.emplace_back("--txn_start");
+ args.emplace_back("--txn_rollback");
+ ASSERT_OK(RunKuduTool(args));
+
+ // An extra run to check for the number of visible rows: there should be
+ // none since the transaction is to be rolled back.
+ args.emplace_back("--run_scan");
+ string out;
+ ASSERT_OK(RunKuduTool(args, &out));
+ ASSERT_STR_MATCHES(out, "expected rows: 0");
+ ASSERT_STR_MATCHES(out, "actual rows : 0");
+ }
+
+ // Supplying '--txn_rollback' implies '--txn_start'.
+ {
+ vector<string> args(base_args);
+ args.emplace_back("--txn_rollback");
+ ASSERT_OK(RunKuduTool(args));
+
+ // An extra run to check for the number of visible rows: there should be
+ // none since the transaction is to be rolled back.
+ args.emplace_back("--run_scan");
+ string out;
+ ASSERT_OK(RunKuduTool(args, &out));
+ ASSERT_STR_MATCHES(out, "expected rows: 0");
+ ASSERT_STR_MATCHES(out, "actual rows : 0");
+ }
+
+ // Supplying '--txn_commit' implies '--txn_start'.
+ {
+ vector<string> args(base_args);
+ args.emplace_back("--txn_commit");
+ ASSERT_OK(RunKuduTool(args));
+
+ // An extra run to check for the number of visible rows: all inserted rows
+ // should be visible now since the transaction is to be committed.
+ args.emplace_back("--run_scan");
+ string out;
+ ASSERT_OK(RunKuduTool(args, &out));
+ ASSERT_STR_MATCHES(out, "expected rows: 222");
+ ASSERT_STR_MATCHES(out, "actual rows : 222");
+ }
+
+ // Mixing '--txn_start' and '--use_upsert' isn't possible since only
+ // INSERT/INSERT_IGNORE supported as transactional operations for now.
+ {
+ vector<string> args(base_args);
+ args.emplace_back("--use_upsert");
+ for (const auto& f : { "--txn_start", "--txn_commit", "--txn_rollback" }) {
+ SCOPED_TRACE(Substitute("running with flag: {}", f));
+ args.emplace_back(f);
+ string err;
+ const auto s = RunKuduTool(args, nullptr, &err);
+ ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+ ASSERT_STR_CONTAINS(
+ err, "only inserts are supported as transactional operations");
+ ASSERT_STR_CONTAINS(err, "remove/unset the --use_upsert flag");
+ }
+ }
+
+ // Mixing '--txn_commit' and '--txn_rollback' doesn't make sense.
+ {
+ vector<string> args(base_args);
+ args.emplace_back("--txn_commit");
+ args.emplace_back("--txn_rollback");
+ string err;
+ const auto s = RunKuduTool(args, nullptr, &err);
+ ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+ ASSERT_STR_CONTAINS(err, "both --txn_commit and --txn_rollback are set");
+ ASSERT_STR_CONTAINS(err, "unset one of those to resolve the conflict");
+ }
+
+ // Run 'kudu txn list' to see whether the transactions are in expected
+ // state after running the sub-scenarios above.
+ {
+ const vector<string> args = {
+ "txn", "list",
+ cluster_->master()->bound_rpc_addr().ToString(),
+ "--included_states=*",
+ "--columns=state",
+ };
+ string out;
+ ASSERT_OK(RunKuduTool(args, &out));
+ ASSERT_STR_MATCHES(out,
+R"(state
+-----------
+ OPEN
+ OPEN
+ COMMITTED
+ COMMITTED
+ ABORTED
+ ABORTED
+ ABORTED
+ ABORTED
+ COMMITTED
+ COMMITTED
+)");
+ }
+}
+
// Test that a non-random workload results in the behavior we would expect when
// running against an auto-generated range partitioned table.
TEST_F(ToolTest, TestNonRandomWorkloadLoadgen) {
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 1129986..0efce37 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -237,7 +237,9 @@
using kudu::client::KuduSession;
using kudu::client::KuduTable;
using kudu::client::KuduTableCreator;
+using kudu::client::KuduTransaction;
using kudu::client::KuduWriteOperation;
+using kudu::client::sp::shared_ptr;
using kudu::clock::LogicalClock;
using kudu::consensus::ConsensusBootstrapInfo;
using kudu::consensus::ConsensusMetadata;
@@ -254,7 +256,6 @@
using std::mutex;
using std::numeric_limits;
using std::ostringstream;
-using std::shared_ptr;
using std::string;
using std::thread;
using std::unique_ptr;
@@ -366,12 +367,25 @@
DEFINE_bool(use_upsert, false,
"Whether to use UPSERT instead of INSERT to store the generated "
"data into the table");
+DEFINE_bool(txn_start, false,
+ "Whether the generated rows are inserted in the context of a "
+ "multi-row transaction. For now, only supported if configured "
+ "to insert rows (i.e. UPDATE and DELETE are not yet supported). "
+ "Unless --txn_commit or --txn_rollback is specified, the started "
+ "transaction is left 'as is' (i.e. neither commit, nor rollback "
+ "is performed) once the tool completes its operations.");
+DEFINE_bool(txn_commit, false,
+ "Whether to commit the multi-row transaction which contains all "
+ "the inserted rows. Setting --txn_commit=true implies setting "
+ "--txn_start=true as well.");
+DEFINE_bool(txn_rollback, false,
+ "Whether to rollback the multi-row transaction which contains all "
+ "the inserted rows. Setting --txn_rollback=true implies setting "
+ "--txn_start=true as well.");
namespace kudu {
namespace tools {
-using ClientFactory = std::function<client::sp::shared_ptr<KuduClient>()>;
-
namespace {
bool ValidatePartitionFlags() {
@@ -387,6 +401,24 @@
}
GROUP_FLAG_VALIDATOR(partition_flags, &ValidatePartitionFlags);
+bool ValidateTxnFlags() {
+ if ((FLAGS_txn_commit || FLAGS_txn_rollback || FLAGS_txn_start) &&
+ FLAGS_use_upsert) {
+ LOG(ERROR) << Substitute("only inserts are supported as transactional "
+ "operations for now: either remove/unset the "
+ "--use_upsert flag or remove/unset txn-related "
+ "flags");
+ return false;
+ }
+ if (FLAGS_txn_commit && FLAGS_txn_rollback) {
+ LOG(ERROR) << Substitute("both --txn_commit and --txn_rollback are set: "
+ "unset one of those to resolve the conflict");
+ return false;
+ }
+ return true;
+}
+GROUP_FLAG_VALIDATOR(txn_flags, &ValidateTxnFlags);
+
const char* OpTypeToString(KuduWriteOperation::Type op_type) {
switch (op_type) {
case KuduWriteOperation::INSERT:
@@ -568,7 +600,7 @@
uint64_t latest_observed_timestamp = 0;
};
-WriteResults GeneratorThread(const client::sp::shared_ptr<KuduClient>& client,
+WriteResults GeneratorThread(const shared_ptr<KuduSession>& session,
const string& table_name,
size_t gen_idx,
KuduWriteOperation::Type op_type) {
@@ -589,7 +621,8 @@
const size_t flush_per_n_rows = FLAGS_flush_per_n_rows;
const uint64_t gen_seq_start = FLAGS_seq_start;
- client::sp::shared_ptr<KuduSession> session(client->NewSession());
+ auto* client = session->client();
+ DCHECK(client);
int64_t idx = 0;
auto generator = [&]() {
@@ -607,7 +640,7 @@
flush_per_n_rows == 0 ? KuduSession::AUTO_FLUSH_BACKGROUND
: KuduSession::MANUAL_FLUSH));
- client::sp::shared_ptr<KuduTable> table;
+ shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(table_name, &table));
// Planning for non-intersecting ranges for different generator threads
@@ -671,22 +704,24 @@
return results;
}
-WriteResults GenerateWriteRows(const ClientFactory& client_factory,
+WriteResults GenerateWriteRows(const vector<shared_ptr<KuduSession>>& sessions,
const string& table_name,
KuduWriteOperation::Type op_type) {
DCHECK(op_type == KuduWriteOperation::INSERT ||
op_type == KuduWriteOperation::DELETE ||
op_type == KuduWriteOperation::UPSERT);
- const size_t gen_num = FLAGS_num_threads;
+ const size_t gen_num = sessions.size();
vector<WriteResults> results(gen_num);
vector<thread> threads;
+ threads.reserve(gen_num);
Stopwatch sw(Stopwatch::ALL_THREADS);
sw.start();
for (size_t i = 0; i < gen_num; ++i) {
- auto client = client_factory();
threads.emplace_back(
- [=, &results]() { results[i] = GeneratorThread(client, table_name, i, op_type); });
+ [=, &results]() {
+ results[i] = GeneratorThread(sessions[i], table_name, i, op_type);
+ });
}
for (auto& t : threads) {
t.join();
@@ -731,7 +766,7 @@
// Fetch all rows from the table with the specified name; iterate over them
// and output their total count.
-Status CountTableRows(const client::sp::shared_ptr<KuduClient>& client,
+Status CountTableRows(const shared_ptr<KuduClient>& client,
const string& table_name, uint64_t* count) {
TableScanner scanner(client, table_name);
scanner.SetReadMode(KuduScanner::ReadMode::READ_YOUR_WRITES);
@@ -744,15 +779,16 @@
}
Status TestLoadGenerator(const RunnerContext& context) {
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
+ const size_t gen_num = FLAGS_num_threads;
string table_name;
bool is_auto_table = false;
if (!FLAGS_table_name.empty()) {
table_name = FLAGS_table_name;
} else {
- static const string kKeyColumnName = "key";
+ constexpr const char* const kKeyColumnName = "key";
// The auto-created table case.
is_auto_table = true;
@@ -777,7 +813,7 @@
// tablets. In case we're inserting in random mode, use unbounded range
// partitioning, so the table has key coverage of the entire keyspace.
const int64_t total_inserted_span =
- SpanPerThread(KuduSchema::ToSchema(schema).num_key_columns()) * FLAGS_num_threads;
+ SpanPerThread(KuduSchema::ToSchema(schema).num_key_columns()) * gen_num;
const int64_t span_per_range =
total_inserted_span / FLAGS_table_num_range_partitions;
table_creator->set_range_partition_columns({ kKeyColumnName });
@@ -797,30 +833,65 @@
cout << "Using " << (is_auto_table ? "auto-created " : "")
<< "table '" << table_name << "'" << endl;
- ClientFactory client_factory = [&]() {
- if (!FLAGS_use_client_per_thread) {
- return client;
+ const bool is_transactional =
+ FLAGS_txn_start || FLAGS_txn_rollback || FLAGS_txn_commit;
+ shared_ptr<KuduTransaction> txn;
+ if (is_transactional) {
+ RETURN_NOT_OK(client->NewTransaction(&txn));
+ }
+
+ // Create a session per generator thread. A KuduSession object keeps a
+ // reference to its client handle, so there is no need to keep references
+ // to the newly created client objects themselves.
+ vector<shared_ptr<KuduSession>> sessions;
+ sessions.reserve(gen_num);
+ for (size_t i = 0; i < gen_num; ++i) {
+ shared_ptr<KuduClient> c;
+ if (FLAGS_use_client_per_thread) {
+ RETURN_NOT_OK(CreateKuduClient(context, &c));
+ } else {
+ c = client;
}
- client::sp::shared_ptr<KuduClient> client;
- CHECK_OK(CreateKuduClient(context, &client));
- return client;
- };
+ if (is_transactional) {
+ shared_ptr<KuduSession> session;
+ RETURN_NOT_OK(txn->CreateSession(&session));
+ sessions.emplace_back(std::move(session));
+ } else {
+ sessions.emplace_back(c->NewSession());
+ }
+ }
+
WriteResults write_results = GenerateWriteRows(
- client_factory,
+ sessions,
table_name,
FLAGS_use_upsert ? KuduWriteOperation::UPSERT : KuduWriteOperation::INSERT);
RETURN_NOT_OK(write_results.status);
client->SetLatestObservedTimestamp(write_results.latest_observed_timestamp);
+ if (txn) {
+ if (FLAGS_txn_commit) {
+ RETURN_NOT_OK(txn->Commit());
+ } else if (FLAGS_txn_rollback) {
+ RETURN_NOT_OK(txn->Rollback());
+ }
+ }
if (FLAGS_run_scan) {
// In case if no write errors encountered, run a table scan to make sure
- // the number of inserted rows matches the results of the scan.
+ // the number of inserted rows matches the results of the scan. In case
+ // of a transactional session, no rows are visible unless the transaction
+ // has been committed. It's non-conventional, but 'dirty' reads are not
+ // implemented for transactional sessions as of now: txn-related
+ // functionality is targeting "batch insert" use cases only.
uint64_t count = 0;
RETURN_NOT_OK(CountTableRows(client, table_name, &count));
+ const bool none_rows_visible =
+ (FLAGS_txn_start && !FLAGS_txn_commit) || FLAGS_txn_rollback;
+ const uint64_t expected_row_count =
+ none_rows_visible ? 0 : write_results.row_count;
cout << endl
<< "Scanner report" << endl
- << " expected rows: " << write_results.row_count << endl
+ << " expected rows: " << expected_row_count << endl
<< " actual rows : " << count << endl;
- if (count != write_results.row_count) {
+ if (count != expected_row_count) {
return Status::RuntimeError(Substitute(
"Row count mismatch: expected $0, actual $1", write_results.row_count, count));
}
@@ -828,7 +899,7 @@
if (FLAGS_run_cleanup) {
RETURN_NOT_OK(GenerateWriteRows(
- client_factory, table_name, KuduWriteOperation::DELETE).status);
+ sessions, table_name, KuduWriteOperation::DELETE).status);
}
if (is_auto_table && !FLAGS_keep_auto_table) {
@@ -841,7 +912,7 @@
}
Status TableScan(const RunnerContext& context) {
- client::sp::shared_ptr<KuduClient> client;
+ shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
const string& table_name = FindOrDie(context.required_args, kTableNameArg);
@@ -877,7 +948,7 @@
scoped_refptr<LogAnchorRegistry> registry(new LogAnchorRegistry());
// Bootstrap the tablet.
- shared_ptr<Tablet> tablet;
+ std::shared_ptr<Tablet> tablet;
scoped_refptr<Log> log;
ConsensusBootstrapInfo cbi;
RETURN_NOT_OK(tablet::BootstrapTablet(std::move(tmeta),
@@ -962,6 +1033,9 @@
.AddOptionalParameter("table_num_hash_partitions")
.AddOptionalParameter("table_num_range_partitions")
.AddOptionalParameter("table_num_replicas")
+ .AddOptionalParameter("txn_start")
+ .AddOptionalParameter("txn_commit")
+ .AddOptionalParameter("txn_rollback")
.AddOptionalParameter("use_client_per_thread")
.AddOptionalParameter("use_random")
.AddOptionalParameter("use_random_pk")