KUDU-3265 txn-related options to 'kudu perf loadgen'

This patch adds the following txn-related options to the
'kudu perf loadgen' tool:
  (1) --txn_start
  (2) --txn_commit
  (3) --txn_rollback

With those, it's possible to insert rows into the table in the scope
of a multi-row transaction, and then (1) leave the transaction
as is, or (2) commit the transaction, or (3) rollback the transaction
explicitly.

Semantically, specifying --txn_commit or --txn_rollback implies setting
--txn_start as well.

This patch partially addresses KUDU-3265.  A follow-up patch will add
functionality to write and read transactional tokens in serialized form,
so the newly introduced --txn_start option starts making more sense.
With that, it will be possible to start a new multi-row transaction
in one session of the kudu CLI tool, and then continue adding more rows
in another session of the kudu CLI tool, passing around transactional
token.

Change-Id: I73750af589d432c6ae973439264f55c08a327aa7
Reviewed-on: http://gerrit.cloudera.org:8080/17526
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <awong@cloudera.com>
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index f619251..528347f 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -2607,6 +2607,166 @@
   ASSERT_OK(WaitForNumTabletsOnTS(ts, expected_tablets, kTimeout));
 }
 
+// Run the loadgen with txn-related options.
+TEST_F(ToolTest, LoadgenTxnBasics) {
+  {
+    ExternalMiniClusterOptions opts;
+    // Prefer lighter cluster to speed up testing.
+    opts.num_tablet_servers = 1;
+    // Since txn-related functionality is not enabled by default for a while,
+    // a couple of flags should be overriden to allow running tests scenarios
+    // in txn-enabled environment. Also, set the txn keepalive interval to a
+    // higher value: in the end of this scenario it's expected to have a couple
+    // of open transactions, but everything might run slower than expected with
+    // TSAN-enabled binaries run on inferior hardware or VMs.
+    opts.extra_master_flags.emplace_back("--txn_manager_enabled");
+    opts.extra_master_flags.emplace_back("--txn_manager_status_table_num_replicas=1");
+    opts.extra_tserver_flags.emplace_back("--enable_txn_system_client_init");
+    opts.extra_tserver_flags.emplace_back("--txn_keepalive_interval_ms=120000");
+    NO_FATALS(StartExternalMiniCluster(std::move(opts)));
+  }
+  const vector<string> base_args = {
+    "perf", "loadgen",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    // Let's have a control over the number of rows inserted.
+    "--num_threads=2",
+    "--num_rows_per_thread=111",
+  };
+
+  // '--txn_start' works as expected when running against txn-enabled cluster.
+  {
+    vector<string> args(base_args);
+    args.emplace_back("--txn_start");
+    ASSERT_OK(RunKuduTool(args));
+
+    // An extra run to check for the number of visible rows: there should be
+    // none since the transaction isn't committed.
+    args.emplace_back("--run_scan");
+    string out;
+    ASSERT_OK(RunKuduTool(args, &out));
+    ASSERT_STR_MATCHES(out, "expected rows: 0");
+    ASSERT_STR_MATCHES(out, "actual rows  : 0");
+  }
+
+  // '--txn_start' and '--txn_commit' combination works as expected.
+  {
+    vector<string> args(base_args);
+    args.emplace_back("--txn_start");
+    args.emplace_back("--txn_commit");
+    ASSERT_OK(RunKuduTool(args));
+
+    // An extra run to check for the number of visible rows: all inserted rows
+    // should be visible now since the transaction is to be committed.
+    args.emplace_back("--run_scan");
+    string out;
+    ASSERT_OK(RunKuduTool(args, &out));
+    ASSERT_STR_MATCHES(out, "expected rows: 222");
+    ASSERT_STR_MATCHES(out, "actual rows  : 222");
+  }
+
+  // '--txn_start' and '--txn_rollback' combination works as expected.
+  {
+    vector<string> args(base_args);
+    args.emplace_back("--txn_start");
+    args.emplace_back("--txn_rollback");
+    ASSERT_OK(RunKuduTool(args));
+
+    // An extra run to check for the number of visible rows: there should be
+    // none since the transaction is to be rolled back.
+    args.emplace_back("--run_scan");
+    string out;
+    ASSERT_OK(RunKuduTool(args, &out));
+    ASSERT_STR_MATCHES(out, "expected rows: 0");
+    ASSERT_STR_MATCHES(out, "actual rows  : 0");
+  }
+
+  // Supplying '--txn_rollback' implies '--txn_start'.
+  {
+    vector<string> args(base_args);
+    args.emplace_back("--txn_rollback");
+    ASSERT_OK(RunKuduTool(args));
+
+    // An extra run to check for the number of visible rows: there should be
+    // none since the transaction is to be rolled back.
+    args.emplace_back("--run_scan");
+    string out;
+    ASSERT_OK(RunKuduTool(args, &out));
+    ASSERT_STR_MATCHES(out, "expected rows: 0");
+    ASSERT_STR_MATCHES(out, "actual rows  : 0");
+  }
+
+  // Supplying '--txn_commit' implies '--txn_start'.
+  {
+    vector<string> args(base_args);
+    args.emplace_back("--txn_commit");
+    ASSERT_OK(RunKuduTool(args));
+
+    // An extra run to check for the number of visible rows: all inserted rows
+    // should be visible now since the transaction is to be committed.
+    args.emplace_back("--run_scan");
+    string out;
+    ASSERT_OK(RunKuduTool(args, &out));
+    ASSERT_STR_MATCHES(out, "expected rows: 222");
+    ASSERT_STR_MATCHES(out, "actual rows  : 222");
+  }
+
+  // Mixing '--txn_start' and '--use_upsert' isn't possible since only
+  // INSERT/INSERT_IGNORE supported as transactional operations for now.
+  {
+    vector<string> args(base_args);
+    args.emplace_back("--use_upsert");
+    for (const auto& f : { "--txn_start", "--txn_commit", "--txn_rollback" }) {
+      SCOPED_TRACE(Substitute("running with flag: {}", f));
+      args.emplace_back(f);
+      string err;
+      const auto s = RunKuduTool(args, nullptr, &err);
+      ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+      ASSERT_STR_CONTAINS(
+          err, "only inserts are supported as transactional operations");
+      ASSERT_STR_CONTAINS(err, "remove/unset the --use_upsert flag");
+    }
+  }
+
+  // Mixing '--txn_commit' and '--txn_rollback' doesn't make sense.
+  {
+    vector<string> args(base_args);
+    args.emplace_back("--txn_commit");
+    args.emplace_back("--txn_rollback");
+    string err;
+    const auto s = RunKuduTool(args, nullptr, &err);
+    ASSERT_TRUE(s.IsRuntimeError()) << s.ToString();
+    ASSERT_STR_CONTAINS(err, "both --txn_commit and --txn_rollback are set");
+    ASSERT_STR_CONTAINS(err, "unset one of those to resolve the conflict");
+  }
+
+  // Run 'kudu txn list' to see whether the transactions are in expected
+  // state after running the sub-scenarios above.
+  {
+    const vector<string> args = {
+      "txn", "list",
+      cluster_->master()->bound_rpc_addr().ToString(),
+      "--included_states=*",
+      "--columns=state",
+    };
+    string out;
+    ASSERT_OK(RunKuduTool(args, &out));
+    ASSERT_STR_MATCHES(out,
+R"(state
+-----------
+ OPEN
+ OPEN
+ COMMITTED
+ COMMITTED
+ ABORTED
+ ABORTED
+ ABORTED
+ ABORTED
+ COMMITTED
+ COMMITTED
+)");
+  }
+}
+
 // Test that a non-random workload results in the behavior we would expect when
 // running against an auto-generated range partitioned table.
 TEST_F(ToolTest, TestNonRandomWorkloadLoadgen) {
diff --git a/src/kudu/tools/tool_action_perf.cc b/src/kudu/tools/tool_action_perf.cc
index 1129986..0efce37 100644
--- a/src/kudu/tools/tool_action_perf.cc
+++ b/src/kudu/tools/tool_action_perf.cc
@@ -237,7 +237,9 @@
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
 using kudu::client::KuduTableCreator;
+using kudu::client::KuduTransaction;
 using kudu::client::KuduWriteOperation;
+using kudu::client::sp::shared_ptr;
 using kudu::clock::LogicalClock;
 using kudu::consensus::ConsensusBootstrapInfo;
 using kudu::consensus::ConsensusMetadata;
@@ -254,7 +256,6 @@
 using std::mutex;
 using std::numeric_limits;
 using std::ostringstream;
-using std::shared_ptr;
 using std::string;
 using std::thread;
 using std::unique_ptr;
@@ -366,12 +367,25 @@
 DEFINE_bool(use_upsert, false,
             "Whether to use UPSERT instead of INSERT to store the generated "
             "data into the table");
+DEFINE_bool(txn_start, false,
+            "Whether the generated rows are inserted in the context of a "
+            "multi-row transaction. For now, only supported if configured "
+            "to insert rows (i.e. UPDATE and DELETE are not yet supported). "
+            "Unless --txn_commit or --txn_rollback is specified, the started "
+            "transaction is left 'as is' (i.e. neither commit, nor rollback "
+            "is performed) once the tool completes its operations.");
+DEFINE_bool(txn_commit, false,
+            "Whether to commit the multi-row transaction which contains all "
+            "the inserted rows. Setting --txn_commit=true implies setting "
+            "--txn_start=true as well.");
+DEFINE_bool(txn_rollback, false,
+            "Whether to rollback the multi-row transaction which contains all "
+            "the inserted rows. Setting --txn_rollback=true implies setting "
+            "--txn_start=true as well.");
 
 namespace kudu {
 namespace tools {
 
-using ClientFactory = std::function<client::sp::shared_ptr<KuduClient>()>;
-
 namespace {
 
 bool ValidatePartitionFlags() {
@@ -387,6 +401,24 @@
 }
 GROUP_FLAG_VALIDATOR(partition_flags, &ValidatePartitionFlags);
 
+bool ValidateTxnFlags() {
+  if ((FLAGS_txn_commit || FLAGS_txn_rollback || FLAGS_txn_start) &&
+      FLAGS_use_upsert) {
+    LOG(ERROR) << Substitute("only inserts are supported as transactional "
+                             "operations for now: either remove/unset the "
+                             "--use_upsert flag or remove/unset txn-related "
+                             "flags");
+    return false;
+  }
+  if (FLAGS_txn_commit && FLAGS_txn_rollback) {
+    LOG(ERROR) << Substitute("both --txn_commit and --txn_rollback are set: "
+                             "unset one of those to resolve the conflict");
+    return false;
+  }
+  return true;
+}
+GROUP_FLAG_VALIDATOR(txn_flags, &ValidateTxnFlags);
+
 const char* OpTypeToString(KuduWriteOperation::Type op_type) {
   switch (op_type) {
     case KuduWriteOperation::INSERT:
@@ -568,7 +600,7 @@
   uint64_t latest_observed_timestamp = 0;
 };
 
-WriteResults GeneratorThread(const client::sp::shared_ptr<KuduClient>& client,
+WriteResults GeneratorThread(const shared_ptr<KuduSession>& session,
                              const string& table_name,
                              size_t gen_idx,
                              KuduWriteOperation::Type op_type) {
@@ -589,7 +621,8 @@
 
   const size_t flush_per_n_rows = FLAGS_flush_per_n_rows;
   const uint64_t gen_seq_start = FLAGS_seq_start;
-  client::sp::shared_ptr<KuduSession> session(client->NewSession());
+  auto* client = session->client();
+  DCHECK(client);
   int64_t idx = 0;
 
   auto generator = [&]() {
@@ -607,7 +640,7 @@
         flush_per_n_rows == 0 ? KuduSession::AUTO_FLUSH_BACKGROUND
                               : KuduSession::MANUAL_FLUSH));
 
-    client::sp::shared_ptr<KuduTable> table;
+    shared_ptr<KuduTable> table;
     RETURN_NOT_OK(client->OpenTable(table_name, &table));
 
     // Planning for non-intersecting ranges for different generator threads
@@ -671,22 +704,24 @@
   return results;
 }
 
-WriteResults GenerateWriteRows(const ClientFactory& client_factory,
+WriteResults GenerateWriteRows(const vector<shared_ptr<KuduSession>>& sessions,
                                const string& table_name,
                                KuduWriteOperation::Type op_type) {
   DCHECK(op_type == KuduWriteOperation::INSERT ||
          op_type == KuduWriteOperation::DELETE ||
          op_type == KuduWriteOperation::UPSERT);
 
-  const size_t gen_num = FLAGS_num_threads;
+  const size_t gen_num = sessions.size();
   vector<WriteResults> results(gen_num);
   vector<thread> threads;
+  threads.reserve(gen_num);
   Stopwatch sw(Stopwatch::ALL_THREADS);
   sw.start();
   for (size_t i = 0; i < gen_num; ++i) {
-    auto client = client_factory();
     threads.emplace_back(
-        [=, &results]() { results[i] = GeneratorThread(client, table_name, i, op_type); });
+      [=, &results]() {
+        results[i] = GeneratorThread(sessions[i], table_name, i, op_type);
+      });
   }
   for (auto& t : threads) {
     t.join();
@@ -731,7 +766,7 @@
 
 // Fetch all rows from the table with the specified name; iterate over them
 // and output their total count.
-Status CountTableRows(const client::sp::shared_ptr<KuduClient>& client,
+Status CountTableRows(const shared_ptr<KuduClient>& client,
                       const string& table_name, uint64_t* count) {
   TableScanner scanner(client, table_name);
   scanner.SetReadMode(KuduScanner::ReadMode::READ_YOUR_WRITES);
@@ -744,15 +779,16 @@
 }
 
 Status TestLoadGenerator(const RunnerContext& context) {
-  client::sp::shared_ptr<KuduClient> client;
+  shared_ptr<KuduClient> client;
   RETURN_NOT_OK(CreateKuduClient(context, &client));
 
+  const size_t gen_num = FLAGS_num_threads;
   string table_name;
   bool is_auto_table = false;
   if (!FLAGS_table_name.empty()) {
     table_name = FLAGS_table_name;
   } else {
-    static const string kKeyColumnName = "key";
+    constexpr const char* const kKeyColumnName = "key";
 
     // The auto-created table case.
     is_auto_table = true;
@@ -777,7 +813,7 @@
       // tablets. In case we're inserting in random mode, use unbounded range
       // partitioning, so the table has key coverage of the entire keyspace.
       const int64_t total_inserted_span =
-          SpanPerThread(KuduSchema::ToSchema(schema).num_key_columns()) * FLAGS_num_threads;
+          SpanPerThread(KuduSchema::ToSchema(schema).num_key_columns()) * gen_num;
       const int64_t span_per_range =
           total_inserted_span / FLAGS_table_num_range_partitions;
       table_creator->set_range_partition_columns({ kKeyColumnName });
@@ -797,30 +833,65 @@
   cout << "Using " << (is_auto_table ? "auto-created " : "")
        << "table '" << table_name << "'" << endl;
 
-  ClientFactory client_factory = [&]() {
-    if (!FLAGS_use_client_per_thread) {
-      return client;
+  const bool is_transactional =
+      FLAGS_txn_start || FLAGS_txn_rollback || FLAGS_txn_commit;
+  shared_ptr<KuduTransaction> txn;
+  if (is_transactional) {
+    RETURN_NOT_OK(client->NewTransaction(&txn));
+  }
+
+  // Create a session per generator thread. A KuduSession object keeps a
+  // reference to its client handle, so there is no need to keep references
+  // to the newly created client objects themselves.
+  vector<shared_ptr<KuduSession>> sessions;
+  sessions.reserve(gen_num);
+  for (size_t i = 0; i < gen_num; ++i) {
+    shared_ptr<KuduClient> c;
+    if (FLAGS_use_client_per_thread) {
+      RETURN_NOT_OK(CreateKuduClient(context, &c));
+    } else {
+      c = client;
     }
-    client::sp::shared_ptr<KuduClient> client;
-    CHECK_OK(CreateKuduClient(context, &client));
-    return client;
-  };
+    if (is_transactional) {
+      shared_ptr<KuduSession> session;
+      RETURN_NOT_OK(txn->CreateSession(&session));
+      sessions.emplace_back(std::move(session));
+    } else {
+      sessions.emplace_back(c->NewSession());
+    }
+  }
+
   WriteResults write_results = GenerateWriteRows(
-      client_factory,
+      sessions,
       table_name,
       FLAGS_use_upsert ? KuduWriteOperation::UPSERT : KuduWriteOperation::INSERT);
   RETURN_NOT_OK(write_results.status);
   client->SetLatestObservedTimestamp(write_results.latest_observed_timestamp);
+  if (txn) {
+    if (FLAGS_txn_commit) {
+      RETURN_NOT_OK(txn->Commit());
+    } else if (FLAGS_txn_rollback) {
+      RETURN_NOT_OK(txn->Rollback());
+    }
+  }
   if (FLAGS_run_scan) {
     // In case if no write errors encountered, run a table scan to make sure
-    // the number of inserted rows matches the results of the scan.
+    // the number of inserted rows matches the results of the scan. In case
+    // of a transactional session, no rows are visible unless the transaction
+    // has been committed. It's non-conventional, but 'dirty' reads are not
+    // implemented for transactional sessions as of now: txn-related
+    // functionality is targeting "batch insert" use cases only.
     uint64_t count = 0;
     RETURN_NOT_OK(CountTableRows(client, table_name, &count));
+    const bool none_rows_visible =
+        (FLAGS_txn_start && !FLAGS_txn_commit) || FLAGS_txn_rollback;
+    const uint64_t expected_row_count =
+        none_rows_visible ? 0 : write_results.row_count;
     cout << endl
          << "Scanner report" << endl
-         << "  expected rows: " << write_results.row_count << endl
+         << "  expected rows: " << expected_row_count << endl
          << "  actual rows  : " << count << endl;
-    if (count != write_results.row_count) {
+    if (count != expected_row_count) {
       return Status::RuntimeError(Substitute(
           "Row count mismatch: expected $0, actual $1", write_results.row_count, count));
     }
@@ -828,7 +899,7 @@
 
   if (FLAGS_run_cleanup) {
     RETURN_NOT_OK(GenerateWriteRows(
-        client_factory, table_name, KuduWriteOperation::DELETE).status);
+        sessions, table_name, KuduWriteOperation::DELETE).status);
   }
 
   if (is_auto_table && !FLAGS_keep_auto_table) {
@@ -841,7 +912,7 @@
 }
 
 Status TableScan(const RunnerContext& context) {
-  client::sp::shared_ptr<KuduClient> client;
+  shared_ptr<KuduClient> client;
   RETURN_NOT_OK(CreateKuduClient(context, &client));
 
   const string& table_name = FindOrDie(context.required_args, kTableNameArg);
@@ -877,7 +948,7 @@
   scoped_refptr<LogAnchorRegistry> registry(new LogAnchorRegistry());
 
   // Bootstrap the tablet.
-  shared_ptr<Tablet> tablet;
+  std::shared_ptr<Tablet> tablet;
   scoped_refptr<Log> log;
   ConsensusBootstrapInfo cbi;
   RETURN_NOT_OK(tablet::BootstrapTablet(std::move(tmeta),
@@ -962,6 +1033,9 @@
       .AddOptionalParameter("table_num_hash_partitions")
       .AddOptionalParameter("table_num_range_partitions")
       .AddOptionalParameter("table_num_replicas")
+      .AddOptionalParameter("txn_start")
+      .AddOptionalParameter("txn_commit")
+      .AddOptionalParameter("txn_rollback")
       .AddOptionalParameter("use_client_per_thread")
       .AddOptionalParameter("use_random")
       .AddOptionalParameter("use_random_pk")