KUDU-2612 add perf scenario to TxnWriteOpsITest

This patch adds WriteOpPerf scenario to TxnWriteOpsITest.  The new
scenario is to evaluate --tablet_max_pending_txn_write_ops setting for
tablet servers: it runs for a short time to count number of completed
Write RPCs in context of a transactional session.  The scenario focuses
on single-row write operations to pinpoint the latency of processing
txn write operations when performing registration of transaction
participants.  Probably, the current locking approach for the
TxnOpDispatcher's queue while submitting the accumulated operations
isn't optimal.  Apparently, the exponential back-off timing built into
the client's RPC retry logic is also an important factor: I saw
significant deviation in the number of completed RPCs from run to run.

Below are results averaged for 100 runs of the benchmark scenario with
varying --max_pending_txn_write_ops accordingly and the following
settings fixed:
  --prepare_connections_to_tservers=true
  --clients=8
  --sessions_per_client=1
  --benchmark_run_time_ms=50

I used the following script to get the accumulated results for
writes in a transactional context:
  for i in {0..99}; do
    ./bin/txn_write_ops-itest --gtest_filter='*WriteOpPerf' \
        --max_pending_txn_write_ops=<X> 2>&1 | grep 'write RPCs' | \
        awk '{print $9}'; done | \
        awk 'BEGIN {sum=0} {sum += $0} END {print sum}'

RELEASE build:
  --max_pending_txn_write_ops=0  : 442.13 RPCs
  --max_pending_txn_write_ops=2  : 494.33 RPCs
  --max_pending_txn_write_ops=5  : 471.90 RPCs
  --max_pending_txn_write_ops=10 : 490.22 RPCs
  --max_pending_txn_write_ops=20 : 469.21 RPCs

DEBUG   build:
  --max_pending_txn_write_ops=0  : 83.74 RPCs
  --max_pending_txn_write_ops=2  : 98.18 RPCs
  --max_pending_txn_write_ops=5  : 95.23 RPCs
  --max_pending_txn_write_ops=10 : 98.12 RPCs
  --max_pending_txn_write_ops=20 : 94.40 RPCs

I also measured the performance of transactional vs non-transactional
for various time intervals in RELEASE build, where for transactional
writes I used --max_pending_txn_write_ops=2 setting:

50ms:
  non-transactional:    588.33 RPCs (11767 req/sec)
      transactional:    487.82 RPCs ( 9756 req/sec)

3000ms:
  non-transactional:  40041.63 RPCs (13347 req/sec)
      transactional:  39759.07 RPCs (13253 req/sec)

8000ms:
  non-transactional: 106832.37 RPCs (13354 req/sec)
      transactional: 105922.65 RPCs (13240 req/sec)

Change-Id: I0370dbb289a4e1cfc154205ae92e13da510682b4
Reviewed-on: http://gerrit.cloudera.org:8080/17105
Tested-by: Alexey Serbin <aserbin@cloudera.com>
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
diff --git a/src/kudu/integration-tests/txn_write_ops-itest.cc b/src/kudu/integration-tests/txn_write_ops-itest.cc
index 0dcf553..cb7a8f0 100644
--- a/src/kudu/integration-tests/txn_write_ops-itest.cc
+++ b/src/kudu/integration-tests/txn_write_ops-itest.cc
@@ -26,6 +26,7 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <numeric>
 #include <ostream>
 #include <set>
 #include <string>
@@ -36,7 +37,7 @@
 #include <vector>
 
 #include <boost/optional/optional.hpp>
-#include <gflags/gflags_declare.h>
+#include <gflags/gflags.h>
 #include <glog/logging.h>
 #include <gtest/gtest.h>
 
@@ -52,6 +53,7 @@
 #include "kudu/consensus/consensus.pb.h"
 #include "kudu/consensus/consensus.proxy.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stl_util.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -83,8 +85,10 @@
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
 using kudu::client::KuduColumnSchema;
+using kudu::client::KuduDeleteIgnore;
 using kudu::client::KuduError;
 using kudu::client::KuduInsert;
+using kudu::client::KuduInsertIgnore;
 using kudu::client::KuduScanBatch;
 using kudu::client::KuduScanner;
 using kudu::client::KuduSchema;
@@ -114,6 +118,21 @@
 using std::vector;
 using strings::Substitute;
 
+// The run-time flags below are for TxnWriteOpsITest.TxnWriteOpPerf scenario.
+DEFINE_bool(prime_connections_to_tservers, true,
+            "whether to open connections to tablet servers prior to sending "
+            "transactional write operations");
+DEFINE_uint32(clients, 8, "number of Kudu clients to run");
+DEFINE_uint32(sessions_per_client, 1,
+              "number of concurrent sessions per Kudu client: "
+              "there will be --clients * --sessions_per_client concurrent "
+              "writer threads in total, i.e. one writer thread per session");
+DEFINE_uint32(benchmark_run_time_ms, 50,
+              "time interval to run the benchmark, in milliseconds");
+DEFINE_uint32(max_pending_txn_write_ops, 10,
+              "setting for tserver's --tablet_max_pending_txn_write_ops flag");
+DEFINE_bool(txn_enabled, true, "whether to use transactional sessions");
+
 DECLARE_bool(tserver_txn_write_op_handling_enabled);
 DECLARE_bool(txn_manager_enabled);
 DECLARE_bool(txn_manager_lazily_initialized);
@@ -140,6 +159,20 @@
   return op;
 }
 
+unique_ptr<KuduInsertIgnore> BuildInsertIgnore(KuduTable* table, int64_t key) {
+  unique_ptr<KuduInsertIgnore> op(table->NewInsertIgnore());
+  KuduPartialRow* row = op->mutable_row();
+  CHECK_OK(row->SetInt64(0, key));
+  return op;
+}
+
+unique_ptr<KuduDeleteIgnore> BuildDeleteIgnore(KuduTable* table, int64_t key) {
+  unique_ptr<KuduDeleteIgnore> op(table->NewDeleteIgnore());
+  KuduPartialRow* row = op->mutable_row();
+  CHECK_OK(row->SetInt64(0, key));
+  return op;
+}
+
 int64_t GetTxnId(const shared_ptr<KuduTransaction>& txn) {
   string txn_token;
   CHECK_OK(txn->Serialize(&txn_token));
@@ -418,6 +451,162 @@
   ASSERT_EQ(row_count, count);
 }
 
+// This scenario runs a benchmark to measure rate of transactional write
+// operations. This is a scenario to evaluate --tablet_max_pending_txn_write_ops
+// flag setting for tablet servers.
+TEST_F(TxnWriteOpsITest, WriteOpPerf) {
+  const vector<string> ts_flags = {
+    Substitute("--tablet_max_pending_txn_write_ops=$0",
+               FLAGS_max_pending_txn_write_ops),
+  };
+  const vector<string> master_flags = {
+    // Enable TxnManager in Kudu masters.
+    // TODO(aserbin): remove this customization once the flag is 'on' by default
+    "--txn_manager_enabled=true",
+
+    // Scenarios based on this test fixture assume the txn status table
+    // is created at start, not on first transaction-related operation.
+    "--txn_manager_lazily_initialized=false",
+  };
+  NO_FATALS(StartCluster(ts_flags, master_flags, kNumTabletServers));
+  NO_FATALS(Prepare());
+
+  const auto num_clients = FLAGS_clients;
+  vector<shared_ptr<KuduClient>> clients;
+  clients.reserve(num_clients);
+  for (auto i = 0; i < num_clients; ++i) {
+    KuduClientBuilder b;
+    b.default_admin_operation_timeout(kTimeout);
+    b.default_rpc_timeout(kTimeout);
+    shared_ptr<KuduClient> c;
+    ASSERT_OK(cluster_->CreateClient(&b, &c));
+    clients.emplace_back(std::move(c));
+  }
+
+  const bool txn_enabled = FLAGS_txn_enabled;
+  const auto num_sessions = num_clients * FLAGS_sessions_per_client;
+  vector<shared_ptr<KuduTransaction>> txns;
+  txns.reserve(num_sessions);
+  vector<shared_ptr<KuduSession>> sessions;
+  sessions.reserve(num_sessions);
+  for (auto i = 0; i < num_sessions; ++i) {
+    const auto client_idx = i % num_clients;
+    auto& c = clients[client_idx];
+    shared_ptr<KuduSession> s;
+    if (!txn_enabled) {
+      s = c->NewSession();
+    } else {
+      shared_ptr<KuduTransaction> txn;
+      ASSERT_OK(c->NewTransaction(&txn));
+      ASSERT_OK(txn->CreateSession(&s));
+      txns.emplace_back(std::move(txn));
+    }
+    ASSERT_NE(nullptr, s.get());
+    ASSERT_OK(s->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+    sessions.emplace_back(std::move(s));
+  }
+
+  // Run multiple writer threads (one thread per session), where every thread
+  // sends as many write operations as it can. For now, using INSERT operations:
+  // INSERT and INSERT_IGNORE are the only write operations supported by
+  // multi-row transaction sessions.
+  atomic<bool> done = false;
+  Barrier barrier(num_sessions + 1);
+  vector<thread> writers;
+  writers.reserve(num_sessions);
+  vector<Status> session_statuses(num_sessions);
+  vector<size_t> row_counters(num_sessions, 0);
+  const bool prime_connections = FLAGS_prime_connections_to_tservers;
+  for (auto session_idx = 0; session_idx < num_sessions; ++session_idx) {
+    writers.emplace_back([&, session_idx] {
+      auto& session = sessions[session_idx];
+      const auto client_idx = session_idx % num_clients;
+      auto& c = clients[client_idx];
+      shared_ptr<KuduTable> table;
+      auto s = c->OpenTable(kTableName, &table);
+      if (PREDICT_FALSE(!s.ok())) {
+        session_statuses[session_idx] = s;
+        return;
+      }
+      if (prime_connections) {
+        // If requested, send several INSERT_INGORE/DELETE_IGNORE operations
+        // to open connections to all tablet servers in the cluster.
+        // Number of rows is set to have at least one row per every tablet:
+        // it's a hash-partitioned table with kNumPartitions tablets.
+        constexpr const auto kNumPreliminaryRows = kNumPartitions * 10;
+        shared_ptr<KuduSession> priming_session = c->NewSession();
+        CHECK_OK(priming_session->SetFlushMode(KuduSession::AUTO_FLUSH_SYNC));
+        for (auto i = 0; i < kNumPreliminaryRows; ++i) {
+          unique_ptr<KuduInsertIgnore> op = BuildInsertIgnore(table.get(), i);
+          auto s = priming_session->Apply(op.release());
+          if (PREDICT_FALSE(!s.ok())) {
+            session_statuses[session_idx] = s;
+            return;
+          }
+        }
+        for (auto i = 0; i < kNumPreliminaryRows; ++i) {
+          unique_ptr<KuduDeleteIgnore> op = BuildDeleteIgnore(table.get(), i);
+          auto s = priming_session->Apply(op.release());
+          if (PREDICT_FALSE(!s.ok())) {
+            session_statuses[session_idx] = s;
+            return;
+          }
+        }
+      }
+      size_t op_idx = 0;
+      barrier.Wait();
+      while (!done) {
+        int64_t key = num_sessions * op_idx + session_idx;
+        unique_ptr<KuduInsert> op(BuildInsert(table.get(), key));
+        auto s = session->Apply(op.release());
+        if (PREDICT_FALSE(!s.ok())) {
+          session_statuses[session_idx] = s;
+          return;
+        }
+        // Every Write RPC results in one row because of AUTO_FLUSH_SYNC mode.
+        ++row_counters[session_idx];
+        ++op_idx;
+      }
+    });
+  }
+  const auto run_time = MonoDelta::FromMilliseconds(FLAGS_benchmark_run_time_ms);
+  barrier.Wait(); // start writers
+  SleepFor(run_time);
+  done = true;    // stop writers
+  std::for_each(writers.begin(), writers.end(), [](thread& t) { t.join(); });
+
+  NO_FATALS(cluster_->AssertNoCrashes());
+  for (auto i = 0; i < session_statuses.size(); ++i) {
+    SCOPED_TRACE(Substitute("session index idx $0", i));
+    const auto& s = session_statuses[i];
+    ASSERT_OK(s);
+  }
+  for (auto& txn : txns) {
+    ASSERT_OK(txn->Commit());
+  }
+  // Sanity check: make sure all the transactions are reported as complete.
+  for (auto& txn : txns) {
+    bool is_complete = false;
+    Status completion_status;
+    ASSERT_OK(txn->IsCommitComplete(&is_complete, &completion_status));
+    ASSERT_TRUE(is_complete);
+  }
+
+  const size_t rows_total = std::accumulate(
+      row_counters.begin(), row_counters.end(), 0UL);
+  LOG(INFO) << Substitute("$0write RPCs completed: $1",
+                          txn_enabled ? "txn " : "", rows_total);
+  LOG(INFO) << Substitute(
+      "$0write RPC rate: $1 req/sec",
+      txn_enabled ? "txn " : "",
+      static_cast<double>(rows_total) / run_time.ToSeconds());
+
+  // Another sanity check: make sure all the rows have been persisted.
+  size_t count;
+  ASSERT_OK(CountRows(table_.get(), &count));
+  ASSERT_EQ(rows_total, count);
+}
+
 // Send a write operation to a tablet server in the context of non-existent
 // transaction. The server should respond back with appropriate error status.
 TEST_F(TxnWriteOpsITest, WriteOpForNonExistentTxn) {