// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <cstdint>
#include <functional>
#include <iostream>
#include <limits>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scanner-internal.h"
#include "kudu/client/schema.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
DEFINE_bool(force_copy, false,
"Force the copy when the destination tablet server has this replica");
DEFINE_bool(include_schema, true,
"Whether to include the schema of each replica");
DECLARE_int64(timeout_ms); // defined in ksck
namespace kudu {
namespace tools {
using client::KuduRowResult;
using client::KuduScanBatch;
using client::KuduSchema;
using consensus::ConsensusServiceProxy;
using consensus::RaftConfigPB;
using consensus::RaftPeerPB;
using consensus::StartTabletCopyRequestPB;
using consensus::StartTabletCopyResponsePB;
using rpc::RpcController;
using server::ServerStatusPB;
using std::cerr;
using std::cout;
using std::endl;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;
using tablet::TabletStatusPB;
using tserver::DeleteTabletRequestPB;
using tserver::DeleteTabletResponsePB;
using tserver::ListTabletsRequestPB;
using tserver::ListTabletsResponsePB;
using tserver::NewScanRequestPB;
using tserver::ScanRequestPB;
using tserver::ScanResponsePB;
using tserver::TabletServer;
using tserver::TabletServerErrorPB;
using tserver::TabletServerAdminServiceProxy;
using tserver::TabletServerServiceProxy;
// This class only exists so that Dump() can easily be friended with
// KuduSchema and KuduScanBatch.
class ReplicaDumper {
static Status Dump(const Schema& schema,
const string& tablet_id,
TabletServerServiceProxy* proxy) {
KuduSchema client_schema(schema);
ScanRequestPB req;
ScanResponsePB resp;
// Scan and dump the tablet.
// Note that we do a READ_LATEST scan as we might be scanning a tablet who lost majority
// and thus cannot do snapshot scans.
// TODO(dalves) When KUDU-1704 is in change this to perform stale snapshot reads, which
// can be ordered.
NewScanRequestPB* new_req = req.mutable_new_scan_request();
schema, new_req->mutable_projected_columns(),
do {
RpcController rpc;
RETURN_NOT_OK_PREPEND(proxy->Scan(req, &resp, &rpc), "Scan() failed");
if (resp.has_error()) {
return Status::IOError("Failed to read: ",
// The first response has a scanner ID. We use this for all subsequent
// responses.
if (resp.has_scanner_id()) {
req.set_call_seq_id(req.call_seq_id() + 1);
// Nothing to process from this scan result.
if (!resp.has_data()) {
KuduScanBatch::Data results;
vector<KuduRowResult> rows;
for (const auto& r : rows) {
cout << r.ToString() << endl;
} while (resp.has_more_results());
return Status::OK();
namespace {
const char* const kReasonArg = "reason";
const char* const kTServerAddressArg = "tserver_address";
const char* const kTServerAddressDesc = "Address of a Kudu Tablet Server of "
"form 'hostname:port'. Port may be omitted if the Tablet Server is bound "
"to the default port.";
const char* const kSrcAddressArg = "src_address";
const char* const kDstAddressArg = "dst_address";
const char* const kPeerUUIDsArg = "peer uuids";
const char* const kPeerUUIDsArgDesc = "List of peer uuids to be part of new config";
Status GetReplicas(TabletServerServiceProxy* proxy,
vector<ListTabletsResponsePB::StatusAndSchemaPB>* replicas) {
ListTabletsRequestPB req;
ListTabletsResponsePB resp;
RpcController rpc;
// Even with FLAGS_include_schema=false, don't set need_schema_info=false
// in the request. The reason is that the schema is still needed to decode
// the partition of each replica, and the partition information is pretty
// much always nice to have.
RETURN_NOT_OK(proxy->ListTablets(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
return Status::OK();
Status CheckReplicas(const RunnerContext& context) {
const string& address = FindOrDie(context.required_args, kTServerAddressArg);
unique_ptr<TabletServerServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));
bool all_running = true;
for (const auto& r : replicas) {
const TabletStatusPB& rs = r.tablet_status();
// It's ok if the tablet replica isn't running if it's tombstoned.
if (rs.state() != tablet::RUNNING &&
rs.tablet_data_state() != tablet::TABLET_DATA_TOMBSTONED) {
cerr << "Tablet id: " << rs.tablet_id() << " is "
<< tablet::TabletStatePB_Name(rs.state()) << endl;
all_running = false;
if (all_running) {
cout << "All tablet replicas are running" << endl;
return Status::OK();
return Status::IllegalState("Not all tablet replicas are running");
Status DeleteReplica(const RunnerContext& context) {
const string& address = FindOrDie(context.required_args, kTServerAddressArg);
const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
const string& reason = FindOrDie(context.required_args, kReasonArg);
ServerStatusPB status;
RETURN_NOT_OK(GetServerStatus(address, tserver::TabletServer::kDefaultPort,
unique_ptr<TabletServerAdminServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
DeleteTabletRequestPB req;
DeleteTabletResponsePB resp;
RpcController rpc;
RETURN_NOT_OK_PREPEND(proxy->DeleteTablet(req, &resp, &rpc),
"DeleteTablet() failed");
if (resp.has_error()) {
return Status::IOError("Failed to delete tablet: ",
return Status::OK();
Status DumpReplica(const RunnerContext& context) {
const string& address = FindOrDie(context.required_args, kTServerAddressArg);
const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
unique_ptr<TabletServerServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));
Schema schema;
for (const auto& r : replicas) {
if (r.tablet_status().tablet_id() == tablet_id) {
RETURN_NOT_OK(SchemaFromPB(r.schema(), &schema));
if (!schema.initialized()) {
return Status::NotFound("cannot find replica", tablet_id);
return ReplicaDumper::Dump(schema, tablet_id, proxy.get());
Status ListReplicas(const RunnerContext& context) {
const string& address = FindOrDie(context.required_args, kTServerAddressArg);
unique_ptr<TabletServerServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));
unordered_set<string> tablet_ids = strings::Split(FLAGS_tablets, ",");
for (const auto& r : replicas) {
if (!FLAGS_table_name.empty() &&
r.tablet_status().table_name() != FLAGS_table_name) {
if (!FLAGS_tablets.empty() &&
!ContainsKey(tablet_ids, r.tablet_status().tablet_id())) {
Schema schema;
SchemaFromPB(r.schema(), &schema),
"Unable to deserialize schema from " + address);
PartitionSchema partition_schema;
PartitionSchema::FromPB(r.partition_schema(), schema, &partition_schema),
"Unable to deserialize partition schema from " + address);
const TabletStatusPB& rs = r.tablet_status();
Partition partition;
Partition::FromPB(rs.partition(), &partition);
const string& state = tablet::TabletStatePB_Name(rs.state());
cout << "Tablet id: " << rs.tablet_id() << endl;
cout << "State: " << state << endl;
cout << "Table name: " << rs.table_name() << endl;
cout << "Partition: "
<< partition_schema.PartitionDebugString(partition, schema) << endl;
if (rs.has_estimated_on_disk_size()) {
cout << "Estimated on disk size: "
<< HumanReadableNumBytes::ToString(rs.estimated_on_disk_size()) << endl;
if (rs.data_dirs_size() != 0) {
cout << "Data dirs: " << JoinStrings(rs.data_dirs(), ", ") << endl;
} else {
cout << "Data dirs: <not available>" << endl;
if (FLAGS_include_schema) {
cout << "Schema: " << schema.ToString() << endl;
return Status::OK();
Status CopyReplica(const RunnerContext& context) {
const string& src_address = FindOrDie(context.required_args, kSrcAddressArg);
const string& dst_address = FindOrDie(context.required_args, kDstAddressArg);
const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
ServerStatusPB dst_status;
RETURN_NOT_OK(GetServerStatus(dst_address, TabletServer::kDefaultPort,
ServerStatusPB src_status;
RETURN_NOT_OK(GetServerStatus(src_address, TabletServer::kDefaultPort,
unique_ptr<ConsensusServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(dst_address, TabletServer::kDefaultPort, &proxy));
StartTabletCopyRequestPB req;
StartTabletCopyResponsePB resp;
RpcController rpc;
*req.mutable_copy_peer_addr() = src_status.bound_rpc_addresses(0);
// Provide a force option if the destination tablet server has the
// replica otherwise tablet-copy will fail.
if (FLAGS_force_copy) {
LOG(INFO) << "Sending copy replica request:\n" << pb_util::SecureDebugString(req);
LOG(WARNING) << "NOTE: this copy may happen asynchronously "
<< "and may timeout if the tablet size is large. Watch the logs on "
<< "the target tablet server for indication of progress.";
RETURN_NOT_OK(proxy->StartTabletCopy(req, &resp, &rpc));
if (resp.has_error()) {
strings::Substitute("Remote server returned error code $0",
return Status::OK();
Status UnsafeChangeConfig(const RunnerContext& context) {
// Parse and validate arguments.
const string& dst_address = FindOrDie(context.required_args, kTServerAddressArg);
const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
ServerStatusPB dst_status;
RETURN_NOT_OK(GetServerStatus(dst_address, TabletServer::kDefaultPort,
if (context.variadic_args.empty()) {
return Status::InvalidArgument("No peer UUIDs specified for the new config");
RaftConfigPB new_config;
for (const auto& arg : context.variadic_args) {
RaftPeerPB new_peer;
// Send a request to replace the config to node dst_address.
unique_ptr<ConsensusServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(dst_address, TabletServer::kDefaultPort, &proxy));
consensus::UnsafeChangeConfigRequestPB req;
consensus::UnsafeChangeConfigResponsePB resp;
RpcController rpc;
*req.mutable_new_config() = new_config;
RETURN_NOT_OK(proxy->UnsafeChangeConfig(req, &resp, &rpc));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
return Status::OK();
} // anonymous namespace
unique_ptr<Mode> BuildRemoteReplicaMode() {
unique_ptr<Action> check_replicas =
ActionBuilder("check", &CheckReplicas)
.Description("Check if all tablet replicas on a Kudu tablet server are "
"running. Tombstoned replica do not count as not running, "
"because they are just records of the previous existence of "
"a replica.")
.AddRequiredParameter({ kTServerAddressArg, kTServerAddressDesc })
unique_ptr<Action> copy_replica =
ActionBuilder("copy", &CopyReplica)
.Description("Copy a tablet replica from one Kudu Tablet Server to another")
.AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
.AddRequiredParameter({ kSrcAddressArg, kTServerAddressDesc })
.AddRequiredParameter({ kDstAddressArg, kTServerAddressDesc })
unique_ptr<Action> delete_replica =
ActionBuilder("delete", &DeleteReplica)
.Description("Delete a tablet replica from a Kudu Tablet Server")
.AddRequiredParameter({ kTServerAddressArg, kTServerAddressDesc })
.AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
.AddRequiredParameter({ kReasonArg, "Reason for deleting the replica" })
unique_ptr<Action> dump_replica =
ActionBuilder("dump", &DumpReplica)
.Description("Dump the data of a tablet replica on a Kudu Tablet Server")
.AddRequiredParameter({ kTServerAddressArg, kTServerAddressDesc })
.AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
unique_ptr<Action> list =
ActionBuilder("list", &ListReplicas)
.Description("List all tablet replicas on a Kudu Tablet Server")
.AddRequiredParameter({ kTServerAddressArg, kTServerAddressDesc })
string("Comma-separated list of tablet IDs used to "
"filter the list of replicas"))
unique_ptr<Action> unsafe_change_config =
ActionBuilder("unsafe_change_config", &UnsafeChangeConfig)
.Description("Force the specified replica to adopt a new Raft config")
.ExtraDescription("This tool is useful when a config change is "
"necessary because a tablet cannot make progress with "
"its current Raft configuration (e.g. to evict "
"followers when a majority is unavailable).\n\nNote: "
"The members of the new Raft config must be a subset "
"of (or the same as) the members of the existing "
"committed Raft config.")
.AddRequiredParameter({ kTServerAddressArg, kTServerAddressDesc })
.AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
.AddRequiredVariadicParameter({ kPeerUUIDsArg, kPeerUUIDsArgDesc })
return ModeBuilder("remote_replica")
.Description("Operate on remote tablet replicas on a Kudu Tablet Server")
} // namespace tools
} // namespace kudu