| // Copyright 2015 Cloudera, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| // Tool to administer a cluster from the CLI. |
| |
| #include <boost/foreach.hpp> |
| #include <boost/optional.hpp> |
| #include <glog/logging.h> |
| #include <gflags/gflags.h> |
| #include <tr1/memory> |
| #include <iostream> |
| #include <strstream> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/consensus/consensus.pb.h" |
| #include "kudu/consensus/consensus.proxy.h" |
| #include "kudu/gutil/strings/split.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/master/master.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/flags.h" |
| #include "kudu/util/logging.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/string_case.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/rpc_controller.h" |
| |
| DEFINE_string(master_addresses, "localhost", |
| "Comma-separated list of Kudu Master server addresses"); |
| DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds"); |
| |
| #define EXIT_NOT_OK_PREPEND(status, msg) \ |
| do { \ |
| Status _s = (status); \ |
| if (PREDICT_FALSE(!_s.ok())) { \ |
| std::cerr << _s.CloneAndPrepend(msg).ToString() << std::endl; \ |
| google::ShowUsageWithFlagsRestrict(g_progname, __FILE__); \ |
| exit(1); \ |
| } \ |
| } while (0) |
| |
| namespace kudu { |
| namespace tools { |
| |
| using std::ostringstream; |
| using std::string; |
| using std::tr1::shared_ptr; |
| using std::vector; |
| |
| using google::protobuf::RepeatedPtrField; |
| |
| using client::KuduClient; |
| using client::KuduClientBuilder; |
| using client::KuduTabletServer; |
| using consensus::ConsensusServiceProxy; |
| using consensus::RaftPeerPB; |
| using master::ListTabletServersRequestPB;; |
| using master::ListTabletServersResponsePB; |
| using master::MasterServiceProxy; |
| using master::TabletLocationsPB; |
| using master::TSInfoPB; |
| using rpc::Messenger; |
| using rpc::MessengerBuilder; |
| using rpc::RpcController; |
| using strings::Split; |
| using strings::Substitute; |
| |
| const char* const kChangeConfigOp = "change_config"; |
| static const char* g_progname = NULL; |
| |
| class ClusterAdminClient { |
| public: |
| // Creates an admin client for host/port combination e.g., |
| // "localhost" or "127.0.0.1:7050". |
| ClusterAdminClient(const std::string& addrs, int64_t timeout_millis); |
| |
| // Initialized the client and connects to the specified tablet |
| // server. |
| Status Init(); |
| |
| // Change the configuration of the specified tablet. |
| Status ChangeConfig(const string& tablet_id, |
| const string& change_type, |
| const string& peer_uuid, |
| const boost::optional<string>& member_type); |
| |
| private: |
| // Fetch the locations of the replicas for a given tablet from the Master. |
| Status GetTabletLocations(const std::string& tablet_id, |
| TabletLocationsPB* locations); |
| |
| // Fetch information about the location of the tablet leader from the Master. |
| Status GetTabletLeader(const std::string& tablet_id, TSInfoPB* ts_info); |
| |
| // Fetch the latest list of tablet servers from the Master. |
| Status ListTabletServers(RepeatedPtrField<ListTabletServersResponsePB::Entry>* servers); |
| |
| // Look up the RPC address of the server with the specified UUID from the Master. |
| Status GetFirstRpcAddressForTS(const std::string& uuid, HostPort* hp); |
| |
| const std::string master_addr_list_; |
| const MonoDelta timeout_; |
| |
| bool initted_; |
| shared_ptr<rpc::Messenger> messenger_; |
| gscoped_ptr<MasterServiceProxy> master_proxy_; |
| |
| DISALLOW_COPY_AND_ASSIGN(ClusterAdminClient); |
| }; |
| |
| ClusterAdminClient::ClusterAdminClient(const string& addrs, int64_t timeout_millis) |
| : master_addr_list_(addrs), |
| timeout_(MonoDelta::FromMilliseconds(timeout_millis)), |
| initted_(false) { |
| } |
| |
| Status ClusterAdminClient::Init() { |
| CHECK(!initted_); |
| |
| // Build master proxy. |
| // TODO: Support multi-master by adding replica lookup support to the client. |
| vector<string> master_addr_strings = Split(master_addr_list_, ","); |
| if (master_addr_strings.size() != 1) { |
| return Status::InvalidArgument("This tool does not yet support multiple masters. " |
| "Please specify only the leader master address in " |
| "-master_addresses"); |
| } |
| |
| HostPort master_hostport; |
| RETURN_NOT_OK(master_hostport.ParseString(master_addr_strings[0], |
| master::Master::kDefaultPort)); |
| MessengerBuilder builder("kudu-admin"); |
| RETURN_NOT_OK(builder.Build(&messenger_)); |
| vector<Sockaddr> master_addrs; |
| RETURN_NOT_OK(master_hostport.ResolveAddresses(&master_addrs)); |
| CHECK(!master_addrs.empty()) << "Unable to resolve IP address for master host: " |
| << master_hostport.ToString(); |
| master_proxy_.reset(new MasterServiceProxy(messenger_, master_addrs[0])); |
| |
| initted_ = true; |
| return Status::OK(); |
| } |
| |
| Status ClusterAdminClient::ChangeConfig(const string& tablet_id, |
| const string& change_type, |
| const string& peer_uuid, |
| const boost::optional<string>& member_type) { |
| CHECK(initted_); |
| |
| // Parse the change type. |
| consensus::ChangeConfigType cc_type = consensus::UNKNOWN_CHANGE; |
| string uppercase_change_type; |
| ToUpperCase(change_type, &uppercase_change_type); |
| if (!consensus::ChangeConfigType_Parse(uppercase_change_type, &cc_type) || |
| cc_type == consensus::UNKNOWN_CHANGE) { |
| return Status::InvalidArgument("Unsupported change_type", change_type); |
| } |
| |
| RaftPeerPB peer_pb; |
| peer_pb.set_permanent_uuid(peer_uuid); |
| |
| // Parse the optional fields. |
| if (member_type) { |
| RaftPeerPB::MemberType member_type_val; |
| string uppercase_member_type; |
| ToUpperCase(*member_type, &uppercase_member_type); |
| if (!RaftPeerPB::MemberType_Parse(uppercase_member_type, &member_type_val)) { |
| return Status::InvalidArgument("Unrecognized member_type", *member_type); |
| } |
| peer_pb.set_member_type(member_type_val); |
| } |
| |
| // Validate the existence of the optional fields. |
| if (!member_type && (cc_type == consensus::ADD_SERVER || cc_type == consensus::CHANGE_ROLE)) { |
| return Status::InvalidArgument("Must specify member_type when adding " |
| "a server or changing a role"); |
| } |
| |
| // Look up RPC address of peer if adding as a new server. |
| if (cc_type == consensus::ADD_SERVER) { |
| HostPort host_port; |
| RETURN_NOT_OK(GetFirstRpcAddressForTS(peer_uuid, &host_port)); |
| RETURN_NOT_OK(HostPortToPB(host_port, peer_pb.mutable_last_known_addr())); |
| } |
| |
| // Look up the location of the tablet leader from the Master. |
| TSInfoPB leader_ts_info; |
| RETURN_NOT_OK(GetTabletLeader(tablet_id, &leader_ts_info)); |
| CHECK_GT(leader_ts_info.rpc_addresses_size(), 0) << leader_ts_info.ShortDebugString(); |
| |
| HostPort leader_hostport; |
| RETURN_NOT_OK(HostPortFromPB(leader_ts_info.rpc_addresses(0), &leader_hostport)); |
| vector<Sockaddr> leader_addrs; |
| RETURN_NOT_OK(leader_hostport.ResolveAddresses(&leader_addrs)); |
| CHECK(!leader_addrs.empty()) << "Unable to resolve IP address for tablet leader host: " |
| << leader_hostport.ToString(); |
| gscoped_ptr<ConsensusServiceProxy> consensus_proxy( |
| new ConsensusServiceProxy(messenger_, leader_addrs[0])); |
| |
| consensus::ChangeConfigRequestPB req; |
| consensus::ChangeConfigResponsePB resp; |
| RpcController rpc; |
| rpc.set_timeout(timeout_); |
| |
| req.set_dest_uuid(leader_ts_info.permanent_uuid()); |
| req.set_tablet_id(tablet_id); |
| req.set_type(cc_type); |
| *req.mutable_server() = peer_pb; |
| |
| RETURN_NOT_OK(consensus_proxy->ChangeConfig(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| return Status::OK(); |
| } |
| |
| Status ClusterAdminClient::GetTabletLocations(const string& tablet_id, |
| TabletLocationsPB* locations) { |
| rpc::RpcController rpc; |
| rpc.set_timeout(timeout_); |
| master::GetTabletLocationsRequestPB req; |
| *req.add_tablet_ids() = tablet_id; |
| master::GetTabletLocationsResponsePB resp; |
| RETURN_NOT_OK(master_proxy_->GetTabletLocations(req, &resp, &rpc)); |
| |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| |
| if (resp.errors_size() > 0) { |
| // This tool only needs to support one-by-one requests for tablet |
| // locations, so we only look at the first error. |
| return StatusFromPB(resp.errors(0).status()); |
| } |
| |
| // Same as above, no batching, and we already got past the error checks. |
| CHECK_EQ(1, resp.tablet_locations_size()) << resp.ShortDebugString(); |
| |
| *locations = resp.tablet_locations(0); |
| return Status::OK(); |
| } |
| |
| Status ClusterAdminClient::GetTabletLeader(const string& tablet_id, |
| TSInfoPB* ts_info) { |
| TabletLocationsPB locations; |
| RETURN_NOT_OK(GetTabletLocations(tablet_id, &locations)); |
| CHECK_EQ(tablet_id, locations.tablet_id()) << locations.ShortDebugString(); |
| bool found = false; |
| BOOST_FOREACH(const TabletLocationsPB::ReplicaPB& replica, locations.replicas()) { |
| if (replica.role() == RaftPeerPB::LEADER) { |
| *ts_info = replica.ts_info(); |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| return Status::NotFound("No leader replica found for tablet", tablet_id); |
| } |
| return Status::OK(); |
| } |
| |
| Status ClusterAdminClient::ListTabletServers( |
| RepeatedPtrField<ListTabletServersResponsePB::Entry>* servers) { |
| |
| rpc::RpcController rpc; |
| rpc.set_timeout(timeout_); |
| ListTabletServersRequestPB req; |
| ListTabletServersResponsePB resp; |
| RETURN_NOT_OK(master_proxy_->ListTabletServers(req, &resp, &rpc)); |
| |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| |
| servers->Swap(resp.mutable_servers()); |
| return Status::OK(); |
| } |
| |
| Status ClusterAdminClient::GetFirstRpcAddressForTS(const std::string& uuid, HostPort* hp) { |
| RepeatedPtrField<ListTabletServersResponsePB::Entry> servers; |
| RETURN_NOT_OK(ListTabletServers(&servers)); |
| BOOST_FOREACH(const ListTabletServersResponsePB::Entry& server, servers) { |
| if (server.instance_id().permanent_uuid() == uuid) { |
| if (!server.has_registration() || server.registration().rpc_addresses_size() == 0) { |
| break; |
| } |
| RETURN_NOT_OK(HostPortFromPB(server.registration().rpc_addresses(0), hp)); |
| return Status::OK(); |
| } |
| } |
| |
| return Status::NotFound(Substitute("Server with UUID $0 has no RPC address " |
| "registered with the Master", uuid)); |
| } |
| |
| static void SetUsage(const char* argv0) { |
| ostringstream str; |
| |
| str << argv0 << " [-master_addresses server1,server2,server3] <operation> <operation-arguments>\n" |
| << "<operation> must be one of:\n" |
| << " " << kChangeConfigOp << " <tablet_id> " |
| << "<ADD_SERVER|REMOVE_SERVER|CHANGE_ROLE> <peer_uuid> " |
| << "[VOTER|NON_VOTER]"; |
| google::SetUsageMessage(str.str()); |
| } |
| |
| static string GetOp(int argc, char** argv) { |
| if (argc < 2) { |
| google::ShowUsageWithFlagsRestrict(argv[0], __FILE__); |
| exit(1); |
| } |
| |
| return argv[1]; |
| } |
| |
| static int ClusterAdminCliMain(int argc, char** argv) { |
| g_progname = argv[0]; |
| FLAGS_logtostderr = 1; |
| SetUsage(argv[0]); |
| ParseCommandLineFlags(&argc, &argv, true); |
| InitGoogleLoggingSafe(argv[0]); |
| const string addrs = FLAGS_master_addresses; |
| |
| string op = GetOp(argc, argv); |
| |
| ClusterAdminClient client(addrs, FLAGS_timeout_ms); |
| |
| EXIT_NOT_OK_PREPEND(client.Init(), "Unable to establish connection to " + addrs); |
| |
| if (op == kChangeConfigOp) { |
| if (argc < 5) { |
| google::ShowUsageWithFlagsRestrict(argv[0], __FILE__); |
| exit(1); |
| } |
| string tablet_id = argv[2]; |
| string change_type = argv[3]; |
| string peer_uuid = argv[4]; |
| boost::optional<string> member_type; |
| if (argc > 5) { |
| member_type = argv[5]; |
| } |
| Status s = client.ChangeConfig(tablet_id, change_type, peer_uuid, member_type); |
| if (!s.ok()) { |
| std::cerr << "Unable to change config: " << s.ToString() << std::endl; |
| return 1; |
| } |
| } else { |
| std::cerr << "Invalid operation: " << op << std::endl; |
| google::ShowUsageWithFlagsRestrict(argv[0], __FILE__); |
| exit(1); |
| } |
| |
| return 0; |
| } |
| |
| } // namespace tools |
| } // namespace kudu |
| |
| int main(int argc, char** argv) { |
| return kudu::tools::ClusterAdminCliMain(argc, argv); |
| } |