blob: 46b4a7610ff8d31ebb1d6787881c7e61c187f8d4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <sys/types.h>
#include <cstdint>
#include <cstring>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/client/client.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scan_predicate.h"
#include "kudu/client/value.h"
#include "kudu/clock/hybrid_clock.h"
#include "kudu/common/timestamp.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/master/master.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/transactions/transactions.pb.h"
#include "kudu/transactions/txn_status_tablet.h"
#include "kudu/transactions/txn_system_client.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/string_case.h"
DECLARE_string(columns);
DEFINE_int64(min_txn_id, -1, "Inclusive minimum transaction ID to display, or -1 for no minimum.");
DEFINE_int64(max_txn_id, -1, "Inclusive maximum transaction ID to display, or -1 for no maximum.");
DEFINE_string(included_states, "open,abort_in_progress,commit_in_progress,finalize_in_progress",
"Comma-separated list of transaction states to display. Supported states "
"are 'open', 'abort_in_progress', 'aborted', 'commit_in_progress', "
"'finalize_in_progress', and 'committed', or '*' for all states. By default, shows "
"currently active transactions.");
DECLARE_int64(timeout_ms);
DECLARE_string(sasl_protocol_name);
using kudu::client::sp::shared_ptr;
using kudu::client::KuduClient;
using kudu::client::KuduPredicate;
using kudu::client::KuduScanner;
using kudu::client::KuduScanBatch;
using kudu::client::KuduTable;
using kudu::client::KuduValue;
using kudu::clock::HybridClock;
using kudu::tablet::TxnMetadataPB;
using kudu::transactions::TxnStatePB;
using kudu::transactions::TxnStatusEntryPB;
using kudu::transactions::TxnStatusTablet;
using kudu::transactions::TxnSystemClient;
using kudu::tserver::ParticipantOpPB;
using std::cout;
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::unordered_set;
using std::vector;
using strings::Substitute;
using strings::Split;
namespace kudu {
namespace tools {
namespace {
enum class ListTxnsField : int {
kTxnId,
kUser,
kState,
kCommitDateTime,
kCommitHybridTime,
kStartDatetime,
kLastTransitionDatetime,
};
const unordered_map<string, ListTxnsField> kStrToTxnField {
{ "txn_id", ListTxnsField::kTxnId },
{ "user", ListTxnsField::kUser },
{ "state", ListTxnsField::kState },
{ "commit_datetime", ListTxnsField::kCommitDateTime },
{ "commit_hybridtime", ListTxnsField::kCommitHybridTime },
{ "start_datetime", ListTxnsField::kStartDatetime },
{ "last_transition_datetime", ListTxnsField::kLastTransitionDatetime },
};
enum class ParticipantField : int {
kTabletId,
kAborted,
kFlushedCommittedMrs,
kBeginCommitDateTime,
kBeginCommitHybridTime,
kCommitDateTime,
kCommitHybridTime,
};
constexpr const char* kParticipantPrefix = "participant_";
const unordered_map<string, ParticipantField> kStrToParticipantFields {
{ "tablet_id", ParticipantField::kTabletId },
{ "aborted", ParticipantField::kAborted },
{ "flushed_committed_mrs", ParticipantField::kFlushedCommittedMrs },
{ "begin_commit_datetime", ParticipantField::kBeginCommitDateTime },
{ "begin_commit_hybridtime", ParticipantField::kBeginCommitHybridTime },
{ "commit_datetime", ParticipantField::kCommitDateTime },
{ "commit_hybridtime", ParticipantField::kCommitHybridTime },
};
constexpr const char* kTxnIdArg = "txn_id";
const unordered_map<string, TxnStatePB> kStrToState = {
{ "open", TxnStatePB::OPEN },
{ "abort_in_progress", TxnStatePB::ABORT_IN_PROGRESS },
{ "aborted", TxnStatePB::ABORTED },
{ "commit_in_progress", TxnStatePB::COMMIT_IN_PROGRESS },
{ "finalize_in_progress", TxnStatePB::FINALIZE_IN_PROGRESS },
{ "committed", TxnStatePB::COMMITTED },
};
unordered_set<TxnStatePB> kIncludedStates;
bool ValidateStatesList(const char* /*flag_name*/, const string& flag_val) {
if (flag_val == "*") {
for (const auto& [_, state] : kStrToState) {
EmplaceOrDie(&kIncludedStates, state);
}
return true;
}
vector<string> included_states_list = Split(flag_val, ",", strings::SkipEmpty());
unordered_set<TxnStatePB> included_states;
for (auto state_lower : included_states_list) {
ToLowerCase(&state_lower);
auto* state = FindOrNull(kStrToState, state_lower);
if (state) {
EmplaceIfNotPresent(&included_states, *state);
} else {
LOG(ERROR) << Substitute("Unexpected state provided: $0", state_lower);
return false;
}
}
kIncludedStates = std::move(included_states);
return true;
}
DEFINE_validator(included_states, &ValidateStatesList);
string HybridTimeToDateTime(int64_t ht) {
const auto physical_micros = HybridClock::GetPhysicalValueMicros(Timestamp(ht));
time_t physical_secs(physical_micros / 1000000);
char buf[kFastToBufferSize];
return FastTimeToBuffer(physical_secs, buf);
}
Status GetFields(vector<string>* txn_field_names, vector<ListTxnsField>* txn_fields,
vector<string>* participant_field_names = nullptr,
vector<ParticipantField>* participant_fields = nullptr) {
for (const auto& column : strings::Split(FLAGS_columns, ",", strings::SkipEmpty())) {
string column_lower;
ToLowerCase(column.ToString(), &column_lower);
if (participant_field_names && participant_fields) {
// If the field is prefixed with "participant_", get the suffix and look
// through the participant fields.
const char* participant_field_suffix =
strnprefix(column_lower.c_str(), column_lower.size(),
kParticipantPrefix, strlen(kParticipantPrefix));
if (participant_field_suffix) {
auto* field = FindOrNull(kStrToParticipantFields, participant_field_suffix);
if (field) {
participant_fields->emplace_back(*field);
participant_field_names->emplace_back(participant_field_suffix);
continue;
}
}
}
// Otherwise, check the transaction fields.
auto* field = FindOrNull(kStrToTxnField, column_lower);
if (!field) {
return Status::InvalidArgument("unknown column specified in --columns", column);
}
txn_fields->emplace_back(*field);
txn_field_names->emplace_back(std::move(column_lower));
}
return Status::OK();
}
void AddTxnStatusRow(const vector<ListTxnsField>& fields, int64_t txn_id,
const TxnStatusEntryPB& txn_entry_pb, DataTable* data_table) {
string commit_ts_ht_str = "<none>";
string commit_ts_date_str = "<none>";
vector<string> col_vals;
for (const auto& field : fields) {
switch (field) {
case ListTxnsField::kTxnId:
col_vals.emplace_back(SimpleItoa(txn_id));
break;
case ListTxnsField::kUser:
col_vals.emplace_back(txn_entry_pb.user());
break;
case ListTxnsField::kState:
col_vals.emplace_back(TxnStatePB_Name(txn_entry_pb.state()));
break;
case ListTxnsField::kCommitDateTime:
col_vals.emplace_back(txn_entry_pb.has_commit_timestamp() ?
HybridTimeToDateTime(txn_entry_pb.commit_timestamp()) : "<none>");
break;
case ListTxnsField::kCommitHybridTime:
col_vals.emplace_back(txn_entry_pb.has_commit_timestamp() ?
HybridClock::StringifyTimestamp(Timestamp(txn_entry_pb.commit_timestamp())) : "<none>");
break;
case ListTxnsField::kStartDatetime: {
char buf[kFastToBufferSize];
col_vals.emplace_back(txn_entry_pb.has_start_timestamp() ?
FastTimeToBuffer(txn_entry_pb.start_timestamp(), buf) : "<none>");
break;
}
case ListTxnsField::kLastTransitionDatetime: {
char buf[kFastToBufferSize];
col_vals.emplace_back(txn_entry_pb.has_last_transition_timestamp() ?
FastTimeToBuffer(txn_entry_pb.last_transition_timestamp(), buf) : "<none>");
break;
}
}
}
data_table->AddRow(std::move(col_vals));
}
} // anonymous namespace
Status ListTxns(const RunnerContext& context) {
shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(TxnStatusTablet::kTxnStatusTableName, &table));
vector<string> field_names;
vector<ListTxnsField> fields;
RETURN_NOT_OK(GetFields(&field_names, &fields));
KuduScanner scanner(table.get());
RETURN_NOT_OK(scanner.SetFaultTolerant());
RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
auto* pred = table->NewComparisonPredicate(
TxnStatusTablet::kEntryTypeColName,
KuduPredicate::EQUAL, KuduValue::FromInt(TxnStatusTablet::TRANSACTION));
RETURN_NOT_OK(scanner.AddConjunctPredicate(pred));
if (FLAGS_min_txn_id > 0) {
auto* min_pred = table->NewComparisonPredicate(
TxnStatusTablet::kTxnIdColName,
KuduPredicate::GREATER_EQUAL, KuduValue::FromInt(FLAGS_min_txn_id));
RETURN_NOT_OK(scanner.AddConjunctPredicate(min_pred));
}
if (FLAGS_max_txn_id > 0) {
auto* max_pred = table->NewComparisonPredicate(
TxnStatusTablet::kTxnIdColName,
KuduPredicate::LESS_EQUAL, KuduValue::FromInt(FLAGS_max_txn_id));
RETURN_NOT_OK(scanner.AddConjunctPredicate(max_pred));
}
RETURN_NOT_OK(scanner.Open());
DataTable data_table(field_names);
KuduScanBatch batch;
while (scanner.HasMoreRows()) {
RETURN_NOT_OK(scanner.NextBatch(&batch));
for (const auto& iter : batch) {
Slice metadata_bytes;
string metadata;
RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kMetadataColName, &metadata_bytes));
TxnStatusEntryPB txn_entry_pb;
RETURN_NOT_OK(pb_util::ParseFromArray(&txn_entry_pb, metadata_bytes.data(),
metadata_bytes.size()));
if (!ContainsKey(kIncludedStates, txn_entry_pb.state())) {
continue;
}
int8_t entry_type;
int64_t txn_id;
RETURN_NOT_OK(iter.GetInt8(TxnStatusTablet::kEntryTypeColName, &entry_type));
CHECK_EQ(TxnStatusTablet::TRANSACTION, entry_type);
RETURN_NOT_OK(iter.GetInt64(TxnStatusTablet::kTxnIdColName, &txn_id));
AddTxnStatusRow(fields, txn_id, txn_entry_pb, &data_table);
}
}
return data_table.PrintTo(std::cout);
}
Status ShowTxn(const RunnerContext& context) {
const auto& txn_id_str = FindOrDie(context.required_args, kTxnIdArg);
int64_t txn_id = -1;
if (!SimpleAtoi(txn_id_str, &txn_id) || txn_id < 0) {
return Status::InvalidArgument(
Substitute("Must supply a valid transaction ID: $0", txn_id_str));
}
vector<string> txn_field_names;
vector<ListTxnsField> txn_fields;
vector<string> participant_field_names;
vector<ParticipantField> participant_fields;
RETURN_NOT_OK(GetFields(&txn_field_names, &txn_fields,
&participant_field_names, &participant_fields));
DCHECK_EQ(txn_field_names.size(), txn_fields.size());
DCHECK_EQ(participant_field_names.size(), participant_fields.size());
// First set up our clients so we can be sure we can connect to the cluster.
std::unique_ptr<TxnSystemClient> txn_client;
vector<HostPort> master_hps;
vector<string> master_addresses;
RETURN_NOT_OK(ParseMasterAddresses(context, kMasterAddressesArg, &master_addresses));
for (const auto& m : master_addresses) {
HostPort hp;
hp.ParseString(m, master::Master::kDefaultPort);
master_hps.emplace_back(hp);
}
RETURN_NOT_OK(TxnSystemClient::Create(master_hps,
FLAGS_sasl_protocol_name,
&txn_client));
RETURN_NOT_OK(txn_client->OpenTxnStatusTable());
shared_ptr<KuduClient> client;
RETURN_NOT_OK(CreateKuduClient(context, &client));
// Scan the transaction status table for its table entries.
shared_ptr<KuduTable> table;
RETURN_NOT_OK(client->OpenTable(TxnStatusTablet::kTxnStatusTableName, &table));
KuduScanner scanner(table.get());
RETURN_NOT_OK(scanner.SetFaultTolerant());
RETURN_NOT_OK(scanner.SetSelection(KuduClient::LEADER_ONLY));
auto* pred = table->NewComparisonPredicate(
TxnStatusTablet::kTxnIdColName, KuduPredicate::EQUAL, KuduValue::FromInt(txn_id));
RETURN_NOT_OK(scanner.AddConjunctPredicate(pred));
RETURN_NOT_OK(scanner.Open());
KuduScanBatch batch;
// Extract transaction statuses and participant IDs from the results.
vector<string> participant_ids;
DataTable txn_table(txn_field_names);
while (scanner.HasMoreRows()) {
RETURN_NOT_OK(scanner.NextBatch(&batch));
for (const auto& iter : batch) {
Slice metadata_bytes;
string metadata;
RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kMetadataColName, &metadata_bytes));
int8_t entry_type;
int64_t fetched_txn_id;
RETURN_NOT_OK(iter.GetInt8(TxnStatusTablet::kEntryTypeColName, &entry_type));
if (entry_type == TxnStatusTablet::TRANSACTION && !txn_fields.empty()) {
TxnStatusEntryPB txn_entry_pb;
RETURN_NOT_OK(pb_util::ParseFromArray(&txn_entry_pb, metadata_bytes.data(),
metadata_bytes.size()));
RETURN_NOT_OK(iter.GetInt64(TxnStatusTablet::kTxnIdColName, &fetched_txn_id));
DCHECK_EQ(txn_id, fetched_txn_id);
AddTxnStatusRow(txn_fields, fetched_txn_id, txn_entry_pb, &txn_table);
} else if (entry_type == TxnStatusTablet::PARTICIPANT && !participant_fields.empty()) {
Slice participant_id;
RETURN_NOT_OK(iter.GetString(TxnStatusTablet::kIdentifierColName, &participant_id));
participant_ids.emplace_back(participant_id.ToString());
}
}
}
if (!txn_fields.empty()) {
RETURN_NOT_OK(txn_table.PrintTo(std::cout));
std::cout << std::endl;
}
if (participant_field_names.empty()) {
return Status::OK();
}
// Get further details from the participants.
DataTable participant_table(participant_field_names);
for (const auto& id : participant_ids) {
TxnMetadataPB meta_pb;
ParticipantOpPB pb;
pb.set_txn_id(txn_id);
pb.set_type(ParticipantOpPB::GET_METADATA);
RETURN_NOT_OK(txn_client->ParticipateInTransaction(
id, pb, MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_timeout_ms),
/*begin_commit_timestamp=*/nullptr, &meta_pb));
vector<string> col_vals;
for (const auto& field : participant_fields) {
switch (field) {
case ParticipantField::kTabletId:
col_vals.emplace_back(id);
break;
case ParticipantField::kAborted:
col_vals.emplace_back(meta_pb.aborted() ? "true" : "false");
break;
case ParticipantField::kFlushedCommittedMrs:
col_vals.emplace_back(meta_pb.flushed_committed_mrs() ? "true" : "false");
break;
case ParticipantField::kBeginCommitDateTime:
col_vals.emplace_back(meta_pb.has_commit_mvcc_op_timestamp() ?
HybridTimeToDateTime(meta_pb.commit_mvcc_op_timestamp()) : "<none>");
break;
case ParticipantField::kBeginCommitHybridTime:
col_vals.emplace_back(meta_pb.has_commit_mvcc_op_timestamp() ?
HybridClock::StringifyTimestamp(Timestamp(meta_pb.commit_mvcc_op_timestamp())) :
"<none>");
break;
case ParticipantField::kCommitDateTime:
col_vals.emplace_back(meta_pb.has_commit_timestamp() ?
HybridTimeToDateTime(meta_pb.commit_timestamp()) : "<none>");
break;
case ParticipantField::kCommitHybridTime:
col_vals.emplace_back(meta_pb.has_commit_timestamp() ?
HybridClock::StringifyTimestamp(Timestamp(meta_pb.commit_timestamp())) : "<none>");
break;
}
}
participant_table.AddRow(std::move(col_vals));
}
return participant_table.PrintTo(std::cout);
}
unique_ptr<Mode> BuildTxnMode() {
unique_ptr<Action> list =
ClusterActionBuilder("list", &ListTxns)
.Description("Show details of multi-row transactions in the cluster")
.AddOptionalParameter(
"columns",
string("txn_id,user,state,commit_datetime,start_datetime,last_transition_datetime"),
string("Comma-separated list of transaction-related info fields to include "
"in the output.\nPossible values: txn_id, user, state, commit_datetime, "
"commit_hybridtime, start_datetime, last_transition_datetime"))
.AddOptionalParameter("max_txn_id")
.AddOptionalParameter("min_txn_id")
.AddOptionalParameter("included_states")
.Build();
// TODO(awong): extend the GET_METADATA call for participants to also return
// table ID, partition info, etc. so we can display it.
unique_ptr<Action> show =
ClusterActionBuilder("show", &ShowTxn)
.AddRequiredParameter({ kTxnIdArg, "Transaction ID on which to operate" })
.AddOptionalParameter(
"columns",
string("txn_id,user,state,commit_datetime,start_datetime,last_transition_datetime,"
"participant_tablet_id,participant_begin_commit_datetime,"
"participant_commit_datetime"),
string("Comma-separated list of transaction-related info fields to include "
"in the output.\nPossible values: txn_id, user, state, commit_datetime, "
"commit_hybridtime, start_datetime, last_transition_datetime, "
"participant_tablet_id, participant_is_aborted, "
"participant_flushed_committed_mrs, participant_begin_commit_datetime, "
"participant_begin_commit_hybridtime, participant_commit_datetime, "
"participant_commit_hybridtime"))
.AddOptionalParameter("timeout_ms")
.Description("Show details of a specific transaction")
.Build();
return ModeBuilder("txn")
.Description("Operate on multi-row transactions")
.AddAction(std::move(list))
.AddAction(std::move(show))
.Build();
}
} // namespace tools
} // namespace kudu