blob: 91fe78b47dbc4a50527d7d59c98f8e916ac3781c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <functional>
#include <iostream>
#include <limits>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scanner-internal.h"
#include "kudu/client/schema.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
DEFINE_bool(force_copy, false,
"Force the copy when the destination tablet server has this replica");
DEFINE_bool(include_schema, true,
"Whether to include the schema of each replica");
DECLARE_string(table_name);
DECLARE_string(tablets);
DECLARE_int64(timeout_ms); // defined in ksck
using kudu::client::KuduRowResult;
using kudu::client::KuduScanBatch;
using kudu::client::KuduSchema;
using kudu::consensus::ConsensusServiceProxy;
using kudu::consensus::RaftConfigPB;
using kudu::consensus::RaftPeerPB;
using kudu::consensus::StartTabletCopyRequestPB;
using kudu::consensus::StartTabletCopyResponsePB;
using kudu::rpc::RpcController;
using kudu::server::ServerStatusPB;
using kudu::tablet::TabletStatusPB;
using kudu::tserver::DeleteTabletRequestPB;
using kudu::tserver::DeleteTabletResponsePB;
using kudu::tserver::ListTabletsRequestPB;
using kudu::tserver::ListTabletsResponsePB;
using kudu::tserver::NewScanRequestPB;
using kudu::tserver::ScanRequestPB;
using kudu::tserver::ScanResponsePB;
using kudu::tserver::TabletServer;
using kudu::tserver::TabletServerAdminServiceProxy;
using kudu::tserver::TabletServerErrorPB;
using kudu::tserver::TabletServerServiceProxy;
using std::cerr;
using std::cout;
using std::endl;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
namespace kudu {
namespace tools {
// This class only exists so that Dump() can easily be friended with
// KuduSchema and KuduScanBatch.
class ReplicaDumper {
public:
static Status Dump(const Schema& schema,
const string& tablet_id,
TabletServerServiceProxy* proxy) {
KuduSchema client_schema(schema);
ScanRequestPB req;
ScanResponsePB resp;
// Scan and dump the tablet.
// Note that we do a READ_LATEST scan as we might be scanning a tablet who lost majority
// and thus cannot do snapshot scans.
// TODO(dalves) When KUDU-1704 is in change this to perform stale snapshot reads, which
// can be ordered.
NewScanRequestPB* new_req = req.mutable_new_scan_request();
RETURN_NOT_OK(SchemaToColumnPBs(
schema, new_req->mutable_projected_columns(),
SCHEMA_PB_WITHOUT_IDS | SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES));
new_req->set_tablet_id(tablet_id);
new_req->set_cache_blocks(false);
do {
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK_PREPEND(proxy->Scan(req, &resp, &rpc), "Scan() failed");
if (resp.has_error()) {
return Status::IOError("Failed to read: ",
pb_util::SecureShortDebugString(resp.error()));
}
// The first response has a scanner ID. We use this for all subsequent
// responses.
if (resp.has_scanner_id()) {
req.set_scanner_id(resp.scanner_id());
req.clear_new_scan_request();
}
req.set_call_seq_id(req.call_seq_id() + 1);
// Nothing to process from this scan result.
if (!resp.has_data()) {
continue;
}
KuduScanBatch::Data results;
RETURN_NOT_OK(results.Reset(&rpc,
&schema,
&client_schema,
client::KuduScanner::NO_FLAGS,
&resp));
vector<KuduRowResult> rows;
results.ExtractRows(&rows);
for (const auto& r : rows) {
cout << r.ToString() << endl;
}
} while (resp.has_more_results());
return Status::OK();
}
};
namespace {
constexpr const char* const kReasonArg = "reason";
constexpr const char* const kSrcAddressArg = "src_address";
constexpr const char* const kDstAddressArg = "dst_address";
constexpr const char* const kPeerUUIDsArg = "peer uuids";
constexpr const char* const kPeerUUIDsArgDesc =
"List of peer uuids to be part of new config, separated by whitespace";
Status GetReplicas(TabletServerServiceProxy* proxy,
vector<ListTabletsResponsePB::StatusAndSchemaPB>* replicas) {
ListTabletsRequestPB req;
ListTabletsResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
// Even with FLAGS_include_schema=false, don't set need_schema_info=false
// in the request. The reason is that the schema is still needed to decode
// the partition of each replica, and the partition information is pretty
// much always nice to have.
RETURN_NOT_OK(proxy->ListTablets(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
replicas->assign(resp.status_and_schema().begin(),
resp.status_and_schema().end());
return Status::OK();
}
Status CheckReplicas(const RunnerContext& context) {
const string& address = FindOrDie(context.required_args, kTServerAddressArg);
unique_ptr<TabletServerServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
&proxy));
vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));
bool all_running = true;
for (const auto& r : replicas) {
const TabletStatusPB& rs = r.tablet_status();
// It's ok if the tablet replica isn't running if it's tombstoned.
if (rs.state() != tablet::RUNNING &&
rs.tablet_data_state() != tablet::TABLET_DATA_TOMBSTONED) {
cerr << "Tablet id: " << rs.tablet_id() << " is "
<< tablet::TabletStatePB_Name(rs.state()) << endl;
all_running = false;
}
}
if (all_running) {
cout << "All tablet replicas are running" << endl;
return Status::OK();
}
return Status::IllegalState("Not all tablet replicas are running");
}
Status DeleteReplica(const RunnerContext& context) {
const string& address = FindOrDie(context.required_args, kTServerAddressArg);
const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
const string& reason = FindOrDie(context.required_args, kReasonArg);
ServerStatusPB status;
RETURN_NOT_OK(GetServerStatus(address, tserver::TabletServer::kDefaultPort,
&status));
unique_ptr<TabletServerAdminServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
&proxy));
DeleteTabletRequestPB req;
DeleteTabletResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
req.set_tablet_id(tablet_id);
req.set_dest_uuid(status.node_instance().permanent_uuid());
req.set_reason(reason);
req.set_delete_type(tablet::TABLET_DATA_TOMBSTONED);
RETURN_NOT_OK_PREPEND(proxy->DeleteTablet(req, &resp, &rpc),
"DeleteTablet() failed");
if (resp.has_error()) {
return Status::IOError("Failed to delete tablet: ",
pb_util::SecureShortDebugString(resp.error()));
}
return Status::OK();
}
Status DumpReplica(const RunnerContext& context) {
const string& address = FindOrDie(context.required_args, kTServerAddressArg);
const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
unique_ptr<TabletServerServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
&proxy));
vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));
Schema schema;
for (const auto& r : replicas) {
if (r.tablet_status().tablet_id() == tablet_id) {
RETURN_NOT_OK(SchemaFromPB(r.schema(), &schema));
break;
}
}
if (!schema.initialized()) {
return Status::NotFound("cannot find replica", tablet_id);
}
return ReplicaDumper::Dump(schema, tablet_id, proxy.get());
}
Status ListReplicas(const RunnerContext& context) {
const string& address = FindOrDie(context.required_args, kTServerAddressArg);
unique_ptr<TabletServerServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
&proxy));
vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));
unordered_set<string> tablet_ids = strings::Split(FLAGS_tablets, ",");
for (const auto& r : replicas) {
if (!FLAGS_table_name.empty() &&
r.tablet_status().table_name() != FLAGS_table_name) {
continue;
}
if (!FLAGS_tablets.empty() &&
!ContainsKey(tablet_ids, r.tablet_status().tablet_id())) {
continue;
}
Schema schema;
RETURN_NOT_OK_PREPEND(
SchemaFromPB(r.schema(), &schema),
"Unable to deserialize schema from " + address);
PartitionSchema partition_schema;
RETURN_NOT_OK_PREPEND(
PartitionSchema::FromPB(r.partition_schema(), schema, &partition_schema),
"Unable to deserialize partition schema from " + address);
const TabletStatusPB& rs = r.tablet_status();
Partition partition;
Partition::FromPB(rs.partition(), &partition);
const string& state = tablet::TabletStatePB_Name(rs.state());
cout << "Tablet id: " << rs.tablet_id() << endl;
cout << "State: " << state << endl;
cout << "Last status: " << rs.last_status() << endl;
if (r.has_role()) {
cout << "Role: " << RaftPeerPB::Role_Name(r.role()) << endl;
}
cout << "Table name: " << rs.table_name() << endl;
cout << "Partition: "
<< partition_schema.PartitionDebugString(partition, schema) << endl;
if (rs.has_estimated_on_disk_size()) {
cout << "Estimated on disk size: "
<< HumanReadableNumBytes::ToString(rs.estimated_on_disk_size()) << endl;
}
const string& data_state = tablet::TabletDataState_Name(rs.tablet_data_state());
cout << "Data state: " << data_state << endl;
if (rs.data_dirs_size() != 0) {
cout << "Data dirs: " << JoinStrings(rs.data_dirs(), ", ") << endl;
} else {
cout << "Data dirs: <not available>" << endl;
}
if (FLAGS_include_schema) {
cout << "Schema: " << schema.ToString() << endl;
}
}
return Status::OK();
}
Status CopyReplica(const RunnerContext& context) {
const string& src_address = FindOrDie(context.required_args, kSrcAddressArg);
const string& dst_address = FindOrDie(context.required_args, kDstAddressArg);
const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
ServerStatusPB dst_status;
RETURN_NOT_OK(GetServerStatus(dst_address, TabletServer::kDefaultPort,
&dst_status));
ServerStatusPB src_status;
RETURN_NOT_OK(GetServerStatus(src_address, TabletServer::kDefaultPort,
&src_status));
unique_ptr<ConsensusServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(dst_address, TabletServer::kDefaultPort, &proxy));
StartTabletCopyRequestPB req;
StartTabletCopyResponsePB resp;
RpcController rpc;
req.set_dest_uuid(dst_status.node_instance().permanent_uuid());
req.set_tablet_id(tablet_id);
req.set_copy_peer_uuid(src_status.node_instance().permanent_uuid());
*req.mutable_copy_peer_addr() = src_status.bound_rpc_addresses(0);
// Provide a force option if the destination tablet server has the
// replica otherwise tablet-copy will fail.
if (FLAGS_force_copy) {
req.set_caller_term(std::numeric_limits<int64_t>::max());
}
LOG(INFO) << "Sending copy replica request:\n" << pb_util::SecureDebugString(req);
LOG(WARNING) << "NOTE: this copy may happen asynchronously "
<< "and may timeout if the tablet size is large. Watch the logs on "
<< "the target tablet server for indication of progress.";
RETURN_NOT_OK(proxy->StartTabletCopy(req, &resp, &rpc));
if (resp.has_error()) {
RETURN_NOT_OK_PREPEND(
StatusFromPB(resp.error().status()),
strings::Substitute("Remote server returned error code $0",
TabletServerErrorPB::Code_Name(resp.error().code())));
}
return Status::OK();
}
Status UnsafeChangeConfig(const RunnerContext& context) {
// Parse and validate arguments.
const string& dst_address = FindOrDie(context.required_args, kTServerAddressArg);
const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
ServerStatusPB dst_status;
RETURN_NOT_OK(GetServerStatus(dst_address, TabletServer::kDefaultPort,
&dst_status));
if (context.variadic_args.empty()) {
return Status::InvalidArgument("No peer UUIDs specified for the new config");
}
RaftConfigPB new_config;
for (const auto& arg : context.variadic_args) {
RaftPeerPB new_peer;
new_peer.set_permanent_uuid(arg);
new_config.add_peers()->CopyFrom(new_peer);
}
// Send a request to replace the config to node dst_address.
unique_ptr<ConsensusServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(dst_address, TabletServer::kDefaultPort, &proxy));
consensus::UnsafeChangeConfigRequestPB req;
consensus::UnsafeChangeConfigResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
req.set_dest_uuid(dst_status.node_instance().permanent_uuid());
req.set_tablet_id(tablet_id);
req.set_caller_id("kudu-tools");
*req.mutable_new_config() = new_config;
RETURN_NOT_OK(proxy->UnsafeChangeConfig(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
return Status::OK();
}
} // anonymous namespace
unique_ptr<Mode> BuildRemoteReplicaMode() {
unique_ptr<Action> check_replicas =
TServerActionBuilder("check", &CheckReplicas)
.Description("Check if all tablet replicas on a Kudu tablet server are "
"running. Tombstoned replica do not count as not running, "
"because they are just records of the previous existence of "
"a replica.")
.Build();
unique_ptr<Action> copy_replica =
RpcActionBuilder("copy", &CopyReplica)
.Description("Copy a tablet replica from one Kudu Tablet Server to another")
.AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
.AddRequiredParameter({ kSrcAddressArg, kTServerAddressDesc })
.AddRequiredParameter({ kDstAddressArg, kTServerAddressDesc })
.AddOptionalParameter("force_copy")
.Build();
unique_ptr<Action> delete_replica =
TServerActionBuilder("delete", &DeleteReplica)
.Description("Delete a tablet replica from a Kudu Tablet Server")
.AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
.AddRequiredParameter({ kReasonArg, "Reason for deleting the replica" })
.Build();
unique_ptr<Action> dump_replica =
TServerActionBuilder("dump", &DumpReplica)
.Description("Dump the data of a tablet replica on a Kudu Tablet Server")
.AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
.Build();
unique_ptr<Action> list =
TServerActionBuilder("list", &ListReplicas)
.Description("List all tablet replicas on a Kudu Tablet Server")
.AddOptionalParameter("include_schema")
.AddOptionalParameter("table_name")
.AddOptionalParameter("tablets",
string(""),
string("Comma-separated list of tablet IDs used to "
"filter the list of replicas"))
.Build();
unique_ptr<Action> unsafe_change_config =
TServerActionBuilder("unsafe_change_config", &UnsafeChangeConfig)
.Description("Force the specified replica to adopt a new Raft config")
.ExtraDescription("This tool is useful when a config change is "
"necessary because a tablet cannot make progress with "
"its current Raft configuration (e.g. to evict "
"followers when a majority is unavailable).\n\nNote: "
"The members of the new Raft config must be a subset "
"of (or the same as) the members of the existing "
"committed Raft config.")
.AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
.AddRequiredVariadicParameter({ kPeerUUIDsArg, kPeerUUIDsArgDesc })
.Build();
return ModeBuilder("remote_replica")
.Description("Operate on remote tablet replicas on a Kudu Tablet Server")
.AddAction(std::move(check_replicas))
.AddAction(std::move(copy_replica))
.AddAction(std::move(delete_replica))
.AddAction(std::move(dump_replica))
.AddAction(std::move(list))
.AddAction(std::move(unsafe_change_config))
.Build();
}
} // namespace tools
} // namespace kudu