blob: d2969ac6ae503c59609d9b1c60eb9ffdc4e25288 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tools/tool_action_common.h"
#include <algorithm>
#include <cstddef>
#include <cstdlib>
#include <iomanip>
#include <iostream>
#include <iterator>
#include <map>
#include <memory>
#include <numeric>
#include <set>
#include <stack>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
// IWYU pragma: no_include <yaml-cpp/node/impl.h>
// IWYU pragma: no_include <yaml-cpp/node/node.h>
#include "kudu/client/client-internal.h" // IWYU pragma: keep
#include "kudu/client/client.h"
#include "kudu/client/master_proxy_rpc.h"
#include "kudu/client/replica_controller-internal.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/common.pb.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h" // IWYU pragma: keep
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/default_key_provider.h"
#include "kudu/fs/key_provider.h"
#include "kudu/fs/ranger_kms_key_provider.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/escaping.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/master/master.h"
#include "kudu/master/master.pb.h"
#include "kudu/master/master.proxy.h" // IWYU pragma: keep
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/response_callback.h"
#include "kudu/rpc/rpc.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tools/tool.pb.h" // IWYU pragma: keep
#include "kudu/tools/tool_action.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h" // IWYU pragma: keep
#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
#include "kudu/util/async_util.h"
#include "kudu/util/env.h"
#include "kudu/util/flag_validators.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/mem_tracker.pb.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/string_case.h"
#include "kudu/util/yamlreader.h"
DEFINE_bool(force, false, "If true, allows the set_flag command to set a flag "
"which is not explicitly marked as runtime-settable. Such flag "
"changes may be simply ignored on the server, or may cause the "
"server to crash.");
DEFINE_bool(print_meta, true, "Include metadata in output");
DEFINE_string(print_entries, "decoded",
"How to print entries:\n"
" false|0|no = don't print\n"
" true|1|yes|decoded = print them decoded\n"
" pb = print the raw protobuf\n"
" id = print only their ids");
DEFINE_string(table_name, "",
"Restrict output to a specific table by name");
DEFINE_string(tablets, "",
"Tablets to check (comma-separated list of IDs) "
"If not specified, checks all tablets.");
DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds");
DEFINE_int32(truncate_data, 100,
"Truncate the data fields to the given number of bytes "
"before printing. Set to 0 to disable");
DEFINE_string(columns, "", "Comma-separated list of column fields to include in output tables");
DEFINE_string(format, "pretty",
"Format to use for printing list output tables.\n"
"Possible values: pretty, space, tsv, csv, and json");
DEFINE_string(flag_tags, "", "Comma-separated list of tags used to restrict which "
"flags are returned. An empty value matches all tags");
DEFINE_bool(all_flags, false, "Whether to return all flags, or only flags that "
"were explicitly set.");
DEFINE_string(flags, "", "Comma-separated list of flags used to restrict which "
"flags are returned. An empty value means no restriction. "
"If non-empty, all_flags is ignored.");
DEFINE_string(tables, "", "Tables to include (comma-separated list of table names)"
"If not specified, includes all tables.");
DEFINE_string(memtracker_output, "table",
"One of 'json', 'json_compact' or 'table'. Table output flattens "
"the memtracker hierarchy.");
DEFINE_int32(num_threads, 2,
"Number of threads to run.");
static bool ValidateNumThreads(const char* flag_name, int32_t flag_value) {
if (flag_value <= 0) {
LOG(ERROR) << strings::Substitute("'$0' flag should have a positive value",
flag_name);
return false;
}
return true;
}
DEFINE_validator(num_threads, &ValidateNumThreads);
DEFINE_int64(negotiation_timeout_ms, 3000,
"Timeout for negotiating an RPC connection to a Kudu server, "
"in milliseconds");
DEFINE_string(sasl_protocol_name,
"kudu",
"SASL protocol name used to connnect to a Kerberos-enabled cluster. Must match the "
"servers' service principal name base (e.g. if it's \"kudu/_HOST\", then "
"sasl_protocol_name must be \"kudu\" to be able to connect.");
DEFINE_bool(row_count_only, false,
"Whether to only count rows instead of reading row cells: yields "
"an empty projection for the table");
DECLARE_bool(show_values);
DEFINE_string(instance_file, "",
"Path to the instance file containing the encrypted encryption key.");
DECLARE_string(encryption_key_provider);
DECLARE_string(ranger_kms_url);
DECLARE_string(encryption_cluster_key_name);
bool ValidateTimeoutSettings() {
if (FLAGS_timeout_ms < FLAGS_negotiation_timeout_ms) {
LOG(ERROR) << strings::Substitute(
"RPC timeout set by --timeout_ms should be not less than connection "
"negotiation timeout set by --negotiation_timeout_ms; "
"current settings are $0 and $1 correspondingly",
FLAGS_timeout_ms, FLAGS_negotiation_timeout_ms);
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(timeout_flags, ValidateTimeoutSettings);
bool ValidateSchemaProjectionFlags() {
if (FLAGS_row_count_only && !FLAGS_columns.empty()) {
LOG(ERROR) <<
"--row_count_only and --columns flags are conflicting: "
"either remove/unset --columns or remove/unset --row_count_only";
return false;
}
if (FLAGS_row_count_only && FLAGS_show_values) {
LOG(ERROR) <<
"--row_count_only and --show_values flags are conflicting: either "
"remove/unset --show_values or remove/unset --row_count_only";
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(schema_projection_flags, ValidateSchemaProjectionFlags);
using kudu::client::KuduClient;
using kudu::client::KuduClientBuilder;
using kudu::client::internal::AsyncLeaderMasterRpc;
using kudu::client::internal::ReplicaController;
using kudu::consensus::ConsensusServiceProxy; // NOLINT
using kudu::consensus::ReplicateMsg;
using kudu::log::LogEntryPB;
using kudu::log::LogEntryReader;
using kudu::log::ReadableLogSegment;
using kudu::master::ConnectToMasterRequestPB;
using kudu::master::ConnectToMasterResponsePB;
using kudu::master::MasterServiceProxy;
using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::BackoffType;
using kudu::rpc::Messenger;
using kudu::rpc::MessengerBuilder;
using kudu::rpc::RequestIdPB;
using kudu::rpc::ResponseCallback;
using kudu::rpc::RpcController;
using kudu::security::KeyProvider;
using kudu::security::DefaultKeyProvider;
using kudu::security::RangerKMSKeyProvider;
using kudu::server::GenericServiceProxy;
using kudu::server::GetFlagsRequestPB;
using kudu::server::GetFlagsResponsePB;
using kudu::server::GetStatusRequestPB;
using kudu::server::GetStatusResponsePB;
using kudu::server::ServerClockRequestPB;
using kudu::server::ServerClockResponsePB;
using kudu::server::ServerStatusPB;
using kudu::server::SetFlagRequestPB;
using kudu::server::SetFlagResponsePB;
using kudu::tserver::ListTabletsRequestPB;
using kudu::tserver::ListTabletsResponsePB;
using kudu::tserver::TabletServerAdminServiceProxy; // NOLINT
using kudu::tserver::TabletServerServiceProxy; // NOLINT
using kudu::tserver::WriteRequestPB;
using std::cout;
using std::endl;
using std::map;
using std::ostream;
using std::set;
using std::setfill;
using std::setw;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::a2b_hex;
using strings::Split;
using strings::Substitute;
namespace kudu {
namespace tools {
const char* const kMasterAddressesArg = "master_addresses";
const char* const kMasterAddressesArgDesc = "Either comma-separated list of Kudu "
"master addresses where each address is of form 'hostname:port', or a cluster name if it has "
"been configured in ${KUDU_CONFIG}/kudurc";
const char* const kDestMasterAddressesArg = "dest_master_addresses";
const char* const kDestMasterAddressesArgDesc = "Either comma-separated list of destination Kudu "
"master addresses where each address is of form 'hostname:port', or a cluster name if it has "
"been configured in ${KUDU_CONFIG}/kudurc";
const char* const kTableNameArg = "table_name";
const char* const kTabletIdArg = "tablet_id";
const char* const kTabletIdArgDesc = "Tablet Identifier";
const char* const kTabletIdsCsvArg = "tablet_ids";
const char* const kTabletIdsCsvArgDesc =
"Comma-separated list of Tablet Identifiers";
const char* const kMasterAddressArg = "master_address";
const char* const kMasterAddressDesc = "Address of a Kudu Master of form "
"'hostname:port'. Port may be omitted if the Master is bound to the "
"default port.";
const char* const kTServerAddressArg = "tserver_address";
const char* const kTServerAddressDesc = "Address of a Kudu Tablet Server of "
"form 'hostname:port'. Port may be omitted if the Tablet Server is bound "
"to the default port.";
namespace {
enum PrintEntryType {
DONT_PRINT,
PRINT_PB,
PRINT_DECODED,
PRINT_ID
};
PrintEntryType ParsePrintType() {
if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), true) == false) {
return DONT_PRINT;
} else if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), false) == true ||
FLAGS_print_entries == "decoded") {
return PRINT_DECODED;
} else if (FLAGS_print_entries == "pb") {
return PRINT_PB;
} else if (FLAGS_print_entries == "id") {
return PRINT_ID;
} else {
LOG(FATAL) << "Unknown value for --print_entries: " << FLAGS_print_entries;
}
}
void PrintIdOnly(const LogEntryPB& entry) {
switch (entry.type()) {
case log::REPLICATE:
{
cout << entry.replicate().id().term() << "." << entry.replicate().id().index()
<< "@" << entry.replicate().timestamp() << "\t";
cout << "REPLICATE "
<< OperationType_Name(entry.replicate().op_type());
break;
}
case log::COMMIT:
{
cout << "COMMIT " << entry.commit().commited_op_id().term()
<< "." << entry.commit().commited_op_id().index();
break;
}
default:
cout << "UNKNOWN: " << SecureShortDebugString(entry);
}
cout << endl;
}
Status PrintDecodedWriteRequestPB(const string& indent,
const Schema& tablet_schema,
const WriteRequestPB& write,
const RequestIdPB* request_id) {
Schema request_schema;
RETURN_NOT_OK(SchemaFromPB(write.schema(), &request_schema));
Arena arena(32 * 1024);
RowOperationsPBDecoder dec(&write.row_operations(), &request_schema, &tablet_schema, &arena);
vector<DecodedRowOperation> ops;
RETURN_NOT_OK(dec.DecodeOperations<DecoderMode::WRITE_OPS>(&ops));
cout << indent << "Tablet: " << write.tablet_id() << endl;
cout << indent << "RequestId: "
<< (request_id ? SecureShortDebugString(*request_id) : "None") << endl;
cout << indent << "Consistency: "
<< ExternalConsistencyMode_Name(write.external_consistency_mode()) << endl;
if (write.has_propagated_timestamp()) {
cout << indent << "Propagated TS: " << write.propagated_timestamp() << endl;
}
int i = 0;
for (const DecodedRowOperation& op : ops) {
// TODO (KUDU-515): Handle the case when a tablet's schema changes
// mid-segment.
cout << indent << "op " << (i++) << ": " << op.ToString(tablet_schema) << endl;
}
return Status::OK();
}
Status PrintDecoded(const LogEntryPB& entry, Schema* tablet_schema) {
PrintIdOnly(entry);
const string indent = "\t";
if (entry.has_replicate()) {
// We can actually decode REPLICATE messages.
const ReplicateMsg& replicate = entry.replicate();
if (replicate.op_type() == consensus::WRITE_OP) {
RETURN_NOT_OK(PrintDecodedWriteRequestPB(
indent,
*tablet_schema,
replicate.write_request(),
replicate.has_request_id() ? &replicate.request_id() : nullptr));
} else if (replicate.op_type() == consensus::ALTER_SCHEMA_OP) {
if (!replicate.has_alter_schema_request()) {
LOG(ERROR) << "read an ALTER_SCHEMA_OP log entry, but has no alter_schema_request";
return Status::RuntimeError("ALTER_SCHEMA_OP log entry has no alter_schema_request");
}
RETURN_NOT_OK(SchemaFromPB(replicate.alter_schema_request().schema(), tablet_schema));
cout << indent << SecureShortDebugString(replicate) << endl;
} else {
cout << indent << SecureShortDebugString(replicate) << endl;
}
} else if (entry.has_commit()) {
// For COMMIT we'll just dump the PB
cout << indent << SecureShortDebugString(entry.commit()) << endl;
}
return Status::OK();
}
// A valid 'cluster name' is beginning with a special character '@'.
// '@' is a character which has no special significance in shells and
// it's an invalid character in hostname list, so we can use it to
// distinguish cluster name from master addresses.
bool GetClusterName(const string& master_addresses_str, string* cluster_name) {
CHECK(cluster_name);
if (HasPrefixString(master_addresses_str, "@")) {
*cluster_name = master_addresses_str.substr(1); // Trim the first '@'.
return true;
}
return false;
}
// Retrieve flags from a remote server.
//
// If 'address' does not contain a port, 'default_port' is used instead.
//
// 'all_flags' controls whether all flags are returned, or only flags which are
// explicitly set.
//
// 'flag_tags' is a comma-separated list of tags used to restrict which flags
// are returned. An empty value matches all tags.
Status GetServerFlags(const string& address,
uint16_t default_port,
bool all_flags,
const string& flags_to_get,
const string& flag_tags,
vector<server::GetFlagsResponsePB_Flag>* flags) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
GetFlagsRequestPB req;
GetFlagsResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
req.set_all_flags(all_flags);
for (StringPiece tag : strings::Split(flag_tags, ",", strings::SkipEmpty())) {
req.add_tags(tag.as_string());
}
for (StringPiece flag: strings::Split(flags_to_get, ",", strings::SkipEmpty())) {
req.add_flags(flag.as_string());
}
RETURN_NOT_OK(proxy->GetFlags(req, &resp, &rpc));
flags->clear();
std::move(resp.flags().begin(), resp.flags().end(), std::back_inserter(*flags));
return Status::OK();
}
} // anonymous namespace
RpcActionBuilder::RpcActionBuilder(std::string name, ActionRunner runner)
: ActionBuilder(std::move(name), std::move(runner)) {
}
unique_ptr<Action> RpcActionBuilder::Build() {
AddOptionalParameter("negotiation_timeout_ms");
AddOptionalParameter("timeout_ms");
return ActionBuilder::Build();
}
ClusterActionBuilder::ClusterActionBuilder(std::string name, ActionRunner runner)
: RpcActionBuilder(std::move(name), std::move(runner)) {
AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc });
}
MasterActionBuilder::MasterActionBuilder(std::string name, ActionRunner runner)
: RpcActionBuilder(std::move(name), std::move(runner)) {
AddRequiredParameter({ kMasterAddressArg, kMasterAddressDesc });
}
TServerActionBuilder::TServerActionBuilder(std::string name, ActionRunner runner)
: RpcActionBuilder(std::move(name), std::move(runner)) {
AddRequiredParameter({ kTServerAddressArg, kTServerAddressDesc });
}
Status BuildMessenger(std::string name, shared_ptr<Messenger>* messenger) {
shared_ptr<Messenger> m;
Status s = MessengerBuilder(std::move(name))
.set_rpc_negotiation_timeout_ms(FLAGS_negotiation_timeout_ms)
.set_sasl_proto_name(FLAGS_sasl_protocol_name)
.Build(&m);
if (s.ok()) {
*messenger = std::move(m);
}
return s;
}
template<class ProxyClass>
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<ProxyClass>* proxy) {
HostPort hp;
RETURN_NOT_OK(hp.ParseString(address, default_port));
vector<Sockaddr> resolved;
RETURN_NOT_OK(hp.ResolveAddresses(&resolved));
shared_ptr<Messenger> messenger;
RETURN_NOT_OK(BuildMessenger("tool", &messenger));
proxy->reset(new ProxyClass(messenger, resolved[0], hp.host()));
return Status::OK();
}
// Explicit specialization for callers outside this compilation unit.
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<ConsensusServiceProxy>* proxy);
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<TabletServerServiceProxy>* proxy);
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<TabletServerAdminServiceProxy>* proxy);
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<MasterServiceProxy>* proxy);
Status GetServerStatus(const string& address, uint16_t default_port,
ServerStatusPB* status) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
GetStatusRequestPB req;
GetStatusResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK(proxy->GetStatus(req, &resp, &rpc));
if (!resp.has_status()) {
return Status::Incomplete("Server response did not contain status",
proxy->ToString());
}
*status = resp.status();
return Status::OK();
}
Status GetReplicas(TabletServerServiceProxy* proxy,
vector<ListTabletsResponsePB::StatusAndSchemaPB>* replicas) {
ListTabletsRequestPB req;
req.set_need_schema_info(true);
ListTabletsResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK(proxy->ListTablets(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
replicas->assign(resp.status_and_schema().begin(),
resp.status_and_schema().end());
return Status::OK();
}
Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) {
PrintEntryType print_type = ParsePrintType();
if (FLAGS_print_meta) {
cout << "Header:\n" << SecureDebugString(segment->header());
}
if (print_type != DONT_PRINT) {
Schema tablet_schema;
RETURN_NOT_OK(SchemaFromPB(segment->header().schema(), &tablet_schema));
LogEntryReader reader(segment.get());
while (true) {
unique_ptr<LogEntryPB> entry;
Status s = reader.ReadNextEntry(&entry);
if (s.IsEndOfFile()) {
break;
}
RETURN_NOT_OK(s);
if (print_type == PRINT_PB) {
if (FLAGS_truncate_data > 0) {
pb_util::TruncateFields(entry.get(), FLAGS_truncate_data);
}
cout << "Entry:\n" << SecureDebugString(*entry);
} else if (print_type == PRINT_DECODED) {
RETURN_NOT_OK(PrintDecoded(*entry, &tablet_schema));
} else if (print_type == PRINT_ID) {
PrintIdOnly(*entry);
}
}
}
if (FLAGS_print_meta && segment->HasFooter()) {
cout << "Footer:\n" << SecureDebugString(segment->footer());
}
return Status::OK();
}
Status PrintServerFlags(const string& address, uint16_t default_port) {
vector<server::GetFlagsResponsePB_Flag> flags;
RETURN_NOT_OK(GetServerFlags(address, default_port, FLAGS_all_flags,
FLAGS_flags, FLAGS_flag_tags, &flags));
std::sort(flags.begin(), flags.end(),
[](const GetFlagsResponsePB::Flag& left,
const GetFlagsResponsePB::Flag& right) {
return left.name() < right.name();
});
DataTable table({ "flag", "value", "default value?", "tags" });
vector<string> tags;
for (const auto& flag : flags) {
tags.clear();
std::copy(flag.tags().begin(), flag.tags().end(), std::back_inserter(tags));
std::sort(tags.begin(), tags.end());
table.AddRow({ flag.name(),
flag.value(),
flag.is_default_value() ? "true" : "false",
JoinStrings(tags, ",") });
}
return table.PrintTo(cout);
}
Status SetServerFlag(const string& address, uint16_t default_port,
const string& flag, const string& value) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
SetFlagRequestPB req;
SetFlagResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
req.set_flag(flag);
req.set_value(value);
req.set_force(FLAGS_force);
RETURN_NOT_OK(proxy->SetFlag(req, &resp, &rpc));
switch (resp.result()) {
case server::SetFlagResponsePB::SUCCESS:
return Status::OK();
case server::SetFlagResponsePB::NOT_SAFE:
return Status::RemoteError(resp.msg() +
" (use --force flag to allow anyway)");
default:
return Status::RemoteError(SecureShortDebugString(resp));
}
}
bool MatchesAnyPattern(const vector<string>& patterns, const string& str) {
// Consider no filter a wildcard.
if (patterns.empty()) return true;
for (const auto& p : patterns) {
if (MatchPattern(str, p)) return true;
}
return false;
}
Status CreateKuduClient(const vector<string>& master_addresses,
client::sp::shared_ptr<KuduClient>* client,
bool can_see_all_replicas) {
auto rpc_timeout = MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
auto negotiation_timeout = MonoDelta::FromMilliseconds(
FLAGS_negotiation_timeout_ms);
KuduClientBuilder b;
if (can_see_all_replicas) {
ReplicaController::SetVisibility(&b, ReplicaController::Visibility::ALL);
}
return b.master_server_addrs(master_addresses)
.default_rpc_timeout(rpc_timeout)
.default_admin_operation_timeout(rpc_timeout)
.connection_negotiation_timeout(negotiation_timeout)
.sasl_protocol_name(FLAGS_sasl_protocol_name)
.Build(client);
}
Status CreateKuduClient(const RunnerContext& context,
const char* master_addresses_arg,
client::sp::shared_ptr<KuduClient>* client) {
vector<string> master_addresses;
RETURN_NOT_OK(ParseMasterAddresses(context, master_addresses_arg,
&master_addresses));
return CreateKuduClient(master_addresses, client);
}
Status CreateKuduClient(const RunnerContext& context,
client::sp::shared_ptr<KuduClient>* client) {
return CreateKuduClient(context, kMasterAddressesArg, client);
}
Status ParseMasterAddressesStr(const RunnerContext& context,
const char* master_addresses_arg,
string* master_addresses_str) {
CHECK(master_addresses_str);
*master_addresses_str = FindOrDie(context.required_args, master_addresses_arg);
string cluster_name;
if (!GetClusterName(*master_addresses_str, &cluster_name)) {
// Treat it as master addresses.
return Status::OK();
}
// Try to resolve cluster name.
char* kudu_config_path = getenv("KUDU_CONFIG");
if (!kudu_config_path) {
return Status::NotFound("${KUDU_CONFIG} is missing");
}
auto config_file = JoinPathSegments(kudu_config_path, "kudurc");
if (!Env::Default()->FileExists(config_file)) {
return Status::NotFound(Substitute("configuration file $0 was not found", config_file));
}
YamlReader reader(config_file);
RETURN_NOT_OK(reader.Init());
YAML::Node clusters_info;
RETURN_NOT_OK(YamlReader::ExtractMap(reader.node(), "clusters_info", &clusters_info));
YAML::Node cluster_info;
RETURN_NOT_OK(YamlReader::ExtractMap(&clusters_info, cluster_name, &cluster_info));
RETURN_NOT_OK(YamlReader::ExtractScalar(&cluster_info, "master_addresses",
master_addresses_str));
return Status::OK();
}
Status ParseMasterAddressesStr(
const RunnerContext& context,
string* master_addresses_str) {
CHECK(master_addresses_str);
return ParseMasterAddressesStr(context, kMasterAddressesArg, master_addresses_str);
}
Status ParseMasterAddresses(const RunnerContext& context,
const char* master_addresses_arg,
vector<string>* master_addresses) {
CHECK(master_addresses);
string master_addresses_str;
RETURN_NOT_OK(ParseMasterAddressesStr(context, master_addresses_arg, &master_addresses_str));
vector<string> master_addresses_local = strings::Split(master_addresses_str, ",");
std::unordered_set<string> unique_masters;
std::unordered_set<string> duplicate_masters;
// Loop through the master addresses to find the duplicate. If there is no master specified
// do not report as a duplicate
for (const auto& master : master_addresses_local) {
if (master.empty()) continue;
if (ContainsKey(unique_masters, master)) {
duplicate_masters.insert(master);
} else {
unique_masters.insert(master);
}
}
if (!duplicate_masters.empty()) {
return Status::InvalidArgument(
"Duplicate master addresses specified: " + JoinStrings(duplicate_masters, ","));
}
*master_addresses = std::move(master_addresses_local);
return Status::OK();
}
Status ParseMasterAddresses(
const RunnerContext& context,
vector<string>* master_addresses) {
CHECK(master_addresses);
return ParseMasterAddresses(context, kMasterAddressesArg, master_addresses);
}
Status MasterAddressesToSet(
const string& master_addresses_arg,
UnorderedHostPortSet* res) {
res->clear();
vector<HostPort> hp_vector;
RETURN_NOT_OK(HostPort::ParseStrings(master_addresses_arg, master::Master::kDefaultPort,
&hp_vector));
*res = UnorderedHostPortSet(hp_vector.begin(), hp_vector.end());
// If we deduplicated some masters addresses, log something about it.
if (res->size() < hp_vector.size()) {
vector<HostPort> addr_list(res->begin(), res->end());
LOG(INFO) << "deduplicated master addresses: "
<< HostPort::ToCommaSeparatedString(addr_list);
}
return Status::OK();
}
Status VerifyMasterAddressList(const vector<string>& master_addresses) {
map<string, set<string>> addresses_per_master;
for (const auto& address : master_addresses) {
unique_ptr<MasterServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, master::Master::kDefaultPort, &proxy));
RpcController ctl;
ctl.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
ConnectToMasterRequestPB req;
ConnectToMasterResponsePB resp;
RETURN_NOT_OK(proxy->ConnectToMaster(req, &resp, &ctl));
const auto& resp_master_addrs = resp.master_addrs();
if (resp_master_addrs.size() != master_addresses.size()) {
const auto addresses_provided = JoinStrings(master_addresses, ",");
const auto addresses_cluster_config =
JoinMapped(resp_master_addrs,
[](const HostPortPB& pb) { return Substitute("$0:$1", pb.host(), pb.port()); },
",");
return Status::InvalidArgument(
Substitute("list of master addresses provided ($0) "
"does not match the actual cluster configuration ($1) ",
addresses_provided,
addresses_cluster_config));
}
set<string> addr_set;
for (const auto& hp : resp_master_addrs) {
addr_set.emplace(Substitute("$0:$1", hp.host(), hp.port()));
}
addresses_per_master.emplace(address, std::move(addr_set));
}
bool mismatch = false;
if (addresses_per_master.size() > 1) {
const auto it_0 = addresses_per_master.cbegin();
auto it_1 = addresses_per_master.begin();
++it_1;
for (auto it = it_1; it != addresses_per_master.end(); ++it) {
if (it->second != it_0->second) {
mismatch = true;
break;
}
}
}
if (mismatch) {
string err_msg = Substitute("specified: ($0);", JoinStrings(master_addresses, ","));
for (const auto& e : addresses_per_master) {
err_msg += Substitute(" from master $0: ($1);", e.first, JoinStrings(e.second, ","));
}
return Status::ConfigurationError(Substitute("master address lists mismatch: $0", err_msg));
}
return Status::OK();
}
Status PrintServerStatus(const string& address, uint16_t default_port) {
ServerStatusPB status;
RETURN_NOT_OK(GetServerStatus(address, default_port, &status));
cout << SecureDebugString(status) << endl;
return Status::OK();
}
Status PrintServerTimestamp(const string& address, uint16_t default_port) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
ServerClockRequestPB req;
ServerClockResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK(proxy->ServerClock(req, &resp, &rpc));
if (!resp.has_timestamp()) {
return Status::Incomplete("Server response did not contain timestamp",
proxy->ToString());
}
cout << resp.timestamp() << endl;
return Status::OK();
}
Status DumpMemTrackers(const string& address, uint16_t default_port) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
server::DumpMemTrackersRequestPB req;
server::DumpMemTrackersResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK(proxy->DumpMemTrackers(req, &resp, &rpc));
if (iequals(FLAGS_memtracker_output, "json")) {
cout << JsonWriter::ToJson(resp.root_tracker(), JsonWriter::Mode::PRETTY)
<< endl;
} else if (iequals(FLAGS_memtracker_output, "json_compact")) {
cout << JsonWriter::ToJson(resp.root_tracker(), JsonWriter::Mode::COMPACT)
<< endl;
} else if (iequals(FLAGS_memtracker_output, "table")) {
DataTable table({ "id", "parent_id", "limit",
"current consumption", "peak_consumption" });
const auto& root = resp.root_tracker();
std::stack<const MemTrackerPB*> to_process;
to_process.push(&root);
while (!to_process.empty()) {
const auto* tracker = to_process.top();
to_process.pop();
table.AddRow({ tracker->id(),
tracker->has_parent_id() ? tracker->parent_id() : "<none>",
std::to_string(tracker->limit()),
std::to_string(tracker->current_consumption()),
std::to_string(tracker->peak_consumption()) });
for (const auto& child_tracker : tracker->child_trackers()) {
to_process.push(&child_tracker);
}
}
RETURN_NOT_OK(table.PrintTo(cout));
} else {
return Status::InvalidArgument("unknown output type (--memtracker_output)",
FLAGS_memtracker_output);
}
return Status::OK();
}
Status GetKuduToolAbsolutePathSafe(string* path) {
static const char* const kKuduCtlFileName = "kudu";
string exe;
RETURN_NOT_OK(Env::Default()->GetExecutablePath(&exe));
const string binroot = DirName(exe);
string tool_abs_path = JoinPathSegments(binroot, kKuduCtlFileName);
if (!Env::Default()->FileExists(tool_abs_path)) {
return Status::NotFound(Substitute(
"$0 binary not found at $1", kKuduCtlFileName, tool_abs_path));
}
*path = std::move(tool_abs_path);
return Status::OK();
}
Status SetServerKey() {
if (FLAGS_instance_file.empty()) {
return Status::OK();
}
InstanceMetadataPB instance;
RETURN_NOT_OK_PREPEND(pb_util::ReadPBContainerFromPath(Env::Default(), FLAGS_instance_file,
&instance, pb_util::NOT_SENSITIVE),
"Could not open instance file");
if (string key = instance.server_key();
!key.empty()) {
unique_ptr<security::KeyProvider> key_provider;
if (FLAGS_encryption_key_provider == "ranger-kms"
|| FLAGS_encryption_key_provider == "ranger_kms") {
key_provider.reset(new RangerKMSKeyProvider(FLAGS_ranger_kms_url,
FLAGS_encryption_cluster_key_name));
} else {
key_provider.reset(new DefaultKeyProvider());
}
string server_key;
RETURN_NOT_OK(key_provider->DecryptServerKey(instance.server_key(),
instance.server_key_iv(),
instance.server_key_version(),
&server_key));
Env::Default()->SetEncryptionKey(reinterpret_cast<const uint8_t*>(a2b_hex(server_key).c_str()),
key.length() * 4);
}
return Status::OK();
}
namespace {
// Pretty print a table using the psql format. For example:
//
// uuid | rpc-addresses | seqno
// ----------------------------------+--------------------------------+------------------
// 335d132897de4bdb9b87443f2c487a42 | 126.rack1.dc1.example.com:7050 | 1492596790237811
// 7425c65d80f54f2da0a85494a5eb3e68 | 122.rack1.dc1.example.com:7050 | 1492596755322350
// dd23284d3a334f1a8306c19d89c1161f | 130.rack1.dc1.example.com:7050 | 1492596704536543
// d8009e07d82b4e66a7ab50f85e60bc30 | 136.rack1.dc1.example.com:7050 | 1492596696557549
// c108a85a68504c2bb9f49e4ee683d981 | 128.rack1.dc1.example.com:7050 | 1492596646623301
void PrettyPrintTable(const vector<string>& headers,
const vector<vector<string>>& columns,
ostream& out) {
CHECK_EQ(headers.size(), columns.size());
if (headers.empty()) return;
size_t num_columns = headers.size();
vector<size_t> widths;
for (int col = 0; col < num_columns; col++) {
size_t width = std::accumulate(columns[col].begin(), columns[col].end(), headers[col].size(),
[](size_t acc, const string& cell) {
return std::max(acc, cell.size());
});
widths.push_back(width);
}
// Print the header row.
for (int col = 0; col < num_columns; col++) {
int padding = widths[col] - headers[col].size();
out << setw(padding / 2) << "" << " " << headers[col];
if (col != num_columns - 1) out << setw((padding + 1) / 2) << "" << " |";
}
out << endl;
// Print the separator row.
out << setfill('-');
for (int col = 0; col < num_columns; col++) {
out << setw(widths[col] + 2) << "";
if (col != num_columns - 1) out << "+";
}
out << endl;
// Print the data rows.
out << setfill(' ');
int num_rows = columns.empty() ? 0 : columns[0].size();
for (int row = 0; row < num_rows; row++) {
for (int col = 0; col < num_columns; col++) {
const auto& value = columns[col][row];
out << " " << value;
if (col != num_columns - 1) {
size_t padding = widths[col] - value.size();
out << setw(padding) << "" << " |";
}
}
out << endl;
}
}
// Print a table using JSON formatting.
//
// The table is formatted as an array of objects. Each object corresponds
// to a row whose fields are the column values.
void JsonPrintTable(const vector<string>& headers,
const vector<vector<string>>& columns,
ostream& out) {
std::ostringstream stream;
JsonWriter writer(&stream, JsonWriter::COMPACT);
int num_columns = columns.size();
int num_rows = columns.empty() ? 0 : columns[0].size();
writer.StartArray();
for (int row = 0; row < num_rows; row++) {
writer.StartObject();
for (int col = 0; col < num_columns; col++) {
writer.String(headers[col]);
writer.String(columns[col][row]);
}
writer.EndObject();
}
writer.EndArray();
out << stream.str() << endl;
}
// Print the table using the provided separator. For example, with a comma
// separator:
//
// 335d132897de4bdb9b87443f2c487a42,126.rack1.dc1.example.com:7050,1492596790237811
// 7425c65d80f54f2da0a85494a5eb3e68,122.rack1.dc1.example.com:7050,1492596755322350
// dd23284d3a334f1a8306c19d89c1161f,130.rack1.dc1.example.com:7050,1492596704536543
// d8009e07d82b4e66a7ab50f85e60bc30,136.rack1.dc1.example.com:7050,1492596696557549
// c108a85a68504c2bb9f49e4ee683d981,128.rack1.dc1.example.com:7050,1492596646623301
void PrintTable(const vector<vector<string>>& columns, const string& separator, ostream& out) {
// TODO(dan): proper escaping of string values.
int num_columns = columns.size();
int num_rows = columns.empty() ? 0 : columns[0].size();
for (int row = 0; row < num_rows; row++) {
for (int col = 0; col < num_columns; col++) {
out << columns[col][row];
if (col != num_columns - 1) out << separator;
}
out << endl;
}
}
} // anonymous namespace
DataTable::DataTable(vector<string> col_names)
: column_names_(std::move(col_names)),
columns_(column_names_.size()) {
}
void DataTable::AddRow(vector<string> row) {
CHECK_EQ(row.size(), columns_.size());
int i = 0;
for (auto& v : row) {
columns_[i++].emplace_back(std::move(v));
}
}
void DataTable::AddColumn(string name, vector<string> column) {
if (!columns_.empty()) {
CHECK_EQ(column.size(), columns_[0].size());
}
column_names_.emplace_back(std::move(name));
columns_.emplace_back(std::move(column));
}
Status DataTable::PrintTo(ostream& out) const {
if (iequals(FLAGS_format, "pretty")) {
PrettyPrintTable(column_names_, columns_, out);
} else if (iequals(FLAGS_format, "space")) {
PrintTable(columns_, " ", out);
} else if (iequals(FLAGS_format, "tsv")) {
PrintTable(columns_, " ", out);
} else if (iequals(FLAGS_format, "csv")) {
PrintTable(columns_, ",", out);
} else if (iequals(FLAGS_format, "json")) {
JsonPrintTable(column_names_, columns_, out);
} else {
return Status::InvalidArgument("unknown format (--format)", FLAGS_format);
}
return Status::OK();
}
LeaderMasterProxy::LeaderMasterProxy(client::sp::shared_ptr<KuduClient> client)
: client_(std::move(client)) {
}
Status LeaderMasterProxy::Init(const vector<string>& master_addrs,
const MonoDelta& timeout,
const MonoDelta& connection_negotiation_timeout) {
return KuduClientBuilder()
.master_server_addrs(master_addrs)
.default_rpc_timeout(timeout)
.default_admin_operation_timeout(timeout)
.connection_negotiation_timeout(connection_negotiation_timeout)
.sasl_protocol_name(FLAGS_sasl_protocol_name)
.Build(&client_);
}
Status LeaderMasterProxy::Init(const RunnerContext& context) {
vector<string> master_addresses;
RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
return Init(
master_addresses,
MonoDelta::FromMilliseconds(FLAGS_timeout_ms),
MonoDelta::FromMilliseconds(FLAGS_negotiation_timeout_ms));
}
template<typename Req, typename Resp>
Status LeaderMasterProxy::SyncRpc(const Req& req,
Resp* resp,
string func_name,
const std::function<void(master::MasterServiceProxy*,
const Req&, Resp*,
rpc::RpcController*,
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags) {
MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
Synchronizer sync;
AsyncLeaderMasterRpc<Req, Resp> rpc(deadline, client_.get(), BackoffType::EXPONENTIAL,
req, resp, func, std::move(func_name), sync.AsStatusCallback(),
std::move(required_feature_flags));
rpc.SendRpc();
return sync.Wait();
}
// Explicit specializations for callers outside this compilation unit.
template
Status LeaderMasterProxy::SyncRpc(
const master::AddMasterRequestPB& req,
master::AddMasterResponsePB* resp,
string func_name,
const std::function<void(MasterServiceProxy*,
const master::AddMasterRequestPB&,
master::AddMasterResponsePB*,
RpcController*,
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);
template
Status LeaderMasterProxy::SyncRpc(
const master::ChangeTServerStateRequestPB& req,
master::ChangeTServerStateResponsePB* resp,
string func_name,
const std::function<void(MasterServiceProxy*,
const master::ChangeTServerStateRequestPB&,
master::ChangeTServerStateResponsePB*,
RpcController*,
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);
template
Status LeaderMasterProxy::SyncRpc(
const master::ListTabletServersRequestPB& req,
master::ListTabletServersResponsePB* resp,
string func_name,
const std::function<void(MasterServiceProxy*,
const master::ListTabletServersRequestPB&,
master::ListTabletServersResponsePB*,
RpcController*,
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);
template
Status LeaderMasterProxy::SyncRpc(
const master::ListMastersRequestPB& req,
master::ListMastersResponsePB* resp,
string func_name,
const std::function<void(MasterServiceProxy*,
const master::ListMastersRequestPB&,
master::ListMastersResponsePB*,
RpcController*,
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);
template
Status LeaderMasterProxy::SyncRpc(
const master::RemoveMasterRequestPB& req,
master::RemoveMasterResponsePB* resp,
string func_name,
const std::function<void(MasterServiceProxy*,
const master::RemoveMasterRequestPB&,
master::RemoveMasterResponsePB*,
RpcController*,
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);
template
Status LeaderMasterProxy::SyncRpc(
const master::ReplaceTabletRequestPB& req,
master::ReplaceTabletResponsePB* resp,
string func_name,
const std::function<void(MasterServiceProxy*,
const master::ReplaceTabletRequestPB&,
master::ReplaceTabletResponsePB*,
RpcController*,
const ResponseCallback&)>& func,
std::vector<uint32_t> required_feature_flags);
} // namespace tools
} // namespace kudu