client-test: start txn on participant before inserting
fd7a40367a81555230eac62a1a8d1467433ed100 added pre-checks for
transactional inserts that the transaction exists on participants, but
was merged without rebasing to run ClientTest.TxnToken.
This starts the transaction on the participants before inserting as a
stopgap until we implement doing so automatically upon inserting for the
first time.
Change-Id: Ia513d90aadac4555d47174c91e4c51b4e158f350
Reviewed-on: http://gerrit.cloudera.org:8080/16757
Reviewed-by: Alexey Serbin <aserbin@cloudera.com>
Tested-by: Kudu Jenkins
diff --git a/src/kudu/client/client-test.cc b/src/kudu/client/client-test.cc
index 9d4c13a..647af55 100644
--- a/src/kudu/client/client-test.cc
+++ b/src/kudu/client/client-test.cc
@@ -102,6 +102,7 @@
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tablet/txn_coordinator.h"
+#include "kudu/tablet/txn_participant-test-util.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/scanners.h"
@@ -109,6 +110,7 @@
#include "kudu/tserver/tablet_server_options.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver.pb.h"
+#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/array_view.h"
#include "kudu/util/async_util.h"
#include "kudu/util/barrier.h"
@@ -194,6 +196,9 @@
using kudu::tablet::TabletReplica;
using kudu::transactions::TxnTokenPB;
using kudu::tserver::MiniTabletServer;
+using kudu::tserver::ParticipantOpPB;
+using kudu::tserver::ParticipantRequestPB;
+using kudu::tserver::ParticipantResponsePB;
using std::function;
using std::map;
using std::pair;
@@ -355,6 +360,24 @@
}
}
+ // TODO(awong): automatically begin transactions when trying to write to a
+ // transaction for the first time.
+ void BeginTxnOnParticipants(int64_t txn_id) {
+ for (auto i = 0; i < cluster_->num_tablet_servers(); ++i) {
+ auto* tm = cluster_->mini_tablet_server(i)->server()->tablet_manager();
+ vector<scoped_refptr<TabletReplica>> replicas;
+ tm->GetTabletReplicas(&replicas);
+ for (auto& r : replicas) {
+ // Skip partitions of the transaction status manager.
+ if (r->txn_coordinator()) continue;
+ ParticipantResponsePB resp;
+ WARN_NOT_OK(CallParticipantOp(
+ r.get(), txn_id, ParticipantOpPB::BEGIN_TXN, -1, &resp),
+ "failed to start transaction on participant");;
+ }
+ }
+ }
+
// TODO(aserbin): consider removing this method and update the scenarios it
// was used once the transaction orchestration is implemented
Status FinalizeCommitTransaction(int64_t txn_id) {
@@ -7030,6 +7053,9 @@
ASSERT_OK(serdes_txn->Serialize(&serdes_txn_token));
ASSERT_EQ(txn_token, serdes_txn_token);
+ // TODO(awong): remove once we register participants automatically before
+ // inserting.
+ BeginTxnOnParticipants(txn_id);
{
static constexpr auto kNumRows = 10;
shared_ptr<KuduSession> session;