| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/mini-cluster/external_mini_cluster.h" |
| |
| #include <algorithm> |
| #include <csignal> |
| #include <cstdint> |
| #include <cstdlib> |
| #include <functional> |
| #include <iterator> |
| #include <memory> |
| #include <string> |
| #include <thread> |
| #include <unordered_set> |
| #include <utility> |
| |
| #include <gflags/gflags.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/client/client.h" |
| #include "kudu/client/master_rpc.h" |
| #include "kudu/fs/default_key_provider.h" |
| #include "kudu/fs/fs.pb.h" |
| #include "kudu/fs/key_provider.h" |
| #include "kudu/postgres/mini_postgres.h" |
| #include "kudu/ranger-kms/mini_ranger_kms.h" |
| #include "kudu/rpc/rpc_header.pb.h" |
| #if !defined(NO_CHRONY) |
| #include "kudu/clock/test/mini_chronyd.h" |
| #endif |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/gutil/basictypes.h" |
| #include "kudu/gutil/strings/escaping.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/stringpiece.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/gutil/strings/util.h" |
| #include "kudu/hms/mini_hms.h" |
| #include "kudu/master/master.pb.h" |
| #include "kudu/master/master.proxy.h" |
| #include "kudu/ranger/mini_ranger.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/rpc/sasl_common.h" |
| #include "kudu/rpc/user_credentials.h" |
| #include "kudu/security/test/mini_kdc.h" |
| #include "kudu/server/server_base.pb.h" |
| #include "kudu/server/server_base.proxy.h" |
| #include "kudu/tablet/metadata.pb.h" |
| #include "kudu/tablet/tablet.pb.h" |
| #include "kudu/tserver/tserver.pb.h" |
| #include "kudu/tserver/tserver_admin.proxy.h" |
| #include "kudu/tserver/tserver_service.proxy.h" |
| #include "kudu/util/async_util.h" |
| #include "kudu/util/env.h" |
| #include "kudu/util/env_util.h" |
| #include "kudu/util/fault_injection.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/sockaddr.h" |
| #include "kudu/util/net/socket.h" |
| #include "kudu/util/path_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/status.h" |
| #include "kudu/util/stopwatch.h" |
| #include "kudu/util/subprocess.h" |
| #include "kudu/util/test_util.h" |
| |
| using kudu::client::internal::ConnectToClusterRpc; |
| #if !defined(NO_CHRONY) |
| using kudu::clock::MiniChronyd; |
| #endif |
| using kudu::master::ListTablesRequestPB; |
| using kudu::master::ListTablesResponsePB; |
| using kudu::master::MasterServiceProxy; |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::RpcController; |
| using kudu::security::DefaultKeyProvider; |
| using kudu::server::ServerStatusPB; |
| using kudu::tserver::ListTabletsRequestPB; |
| using kudu::tserver::ListTabletsResponsePB; |
| using kudu::tserver::TabletServerAdminServiceProxy; |
| using kudu::tserver::TabletServerServiceProxy; |
| using std::copy; |
| using std::map; |
| using std::pair; |
| using std::string; |
| using std::unique_ptr; |
| using std::unordered_set; |
| using std::vector; |
| using strings::a2b_hex; |
| using strings::Substitute; |
| |
| typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB; |
| |
| DECLARE_bool(encrypt_data_at_rest); |
| DECLARE_string(block_manager); |
| DECLARE_string(dns_addr_resolution_override); |
| |
| DEFINE_bool(perf_record, false, |
| "Whether to run \"perf record --call-graph fp\" on each daemon in the cluster"); |
| |
| namespace kudu { |
| namespace cluster { |
| |
| static const char* const kKuduBinaryName = "kudu"; |
| static double kTabletServerRegistrationTimeoutSeconds = 15.0; |
| static double kMasterCatalogManagerTimeoutSeconds = 60.0; |
| |
| ExternalMiniClusterOptions::ExternalMiniClusterOptions() |
| : num_masters(1), |
| supply_single_master_addr(true), |
| num_tablet_servers(1), |
| bind_mode(kDefaultBindMode), |
| num_data_dirs(1), |
| enable_kerberos(false), |
| principal("kudu"), |
| hms_mode(HmsMode::NONE), |
| enable_ranger(false), |
| enable_ranger_kms(false), |
| ranger_cluster_key("kuduclusterkey"), |
| enable_encryption(FLAGS_encrypt_data_at_rest), |
| logtostderr(true), |
| start_process_timeout(MonoDelta::FromSeconds(70)), |
| rpc_negotiation_timeout(MonoDelta::FromSeconds(3)) |
| #if !defined(NO_CHRONY) |
| , |
| num_ntp_servers(1), |
| ntp_config_mode(BuiltinNtpConfigMode::ALL_SERVERS) |
| #endif // #if !defined(NO_CHRONY) ... |
| { |
| } |
| |
| ExternalMiniCluster::ExternalMiniCluster() |
| : opts_(ExternalMiniClusterOptions()) { |
| } |
| |
| ExternalMiniCluster::ExternalMiniCluster(ExternalMiniClusterOptions opts) |
| : opts_(std::move(opts)) { |
| } |
| |
| ExternalMiniCluster::~ExternalMiniCluster() { |
| Shutdown(); |
| } |
| |
| Env* ExternalMiniCluster::env() const { |
| return Env::Default(); |
| } |
| |
| Env* ExternalMiniCluster::ts_env(int ts_idx) const { |
| return tablet_server(ts_idx)->env(); |
| } |
| |
| Env* ExternalMiniCluster::master_env(int master_idx) const { |
| return master(master_idx)->env(); |
| } |
| |
| Status ExternalMiniCluster::DeduceBinRoot(std::string* ret) { |
| string exe; |
| RETURN_NOT_OK(env()->GetExecutablePath(&exe)); |
| *ret = DirName(exe); |
| return Status::OK(); |
| } |
| |
| Status ExternalMiniCluster::HandleOptions() { |
| if (opts_.daemon_bin_path.empty()) { |
| RETURN_NOT_OK(DeduceBinRoot(&opts_.daemon_bin_path)); |
| } |
| |
| if (opts_.cluster_root.empty()) { |
| // If they don't specify a cluster root, use the current gtest directory. |
| opts_.cluster_root = JoinPathSegments(GetTestDataDirectory(), "minicluster-data"); |
| } |
| |
| if (opts_.block_manager_type.empty()) { |
| opts_.block_manager_type = FLAGS_block_manager; |
| } |
| |
| vector<string> host_mappings; |
| if (!opts_.tserver_alias_prefix.empty()) { |
| for (int i = 0; i < opts_.num_tablet_servers; i++) { |
| host_mappings.emplace_back(Substitute("$0.$1=$2", opts_.tserver_alias_prefix, i, |
| GetBindIpForTabletServer(i))); |
| } |
| } |
| if (!opts_.master_alias_prefix.empty()) { |
| for (int i = 0; i < opts_.num_masters; i++) { |
| host_mappings.emplace_back(Substitute("$0.$1=$2", opts_.master_alias_prefix, i, |
| GetBindIpForMaster(i))); |
| } |
| } |
| if (!host_mappings.empty()) { |
| dns_overrides_ = JoinStrings(host_mappings, ","); |
| opts_.extra_master_flags.emplace_back( |
| Substitute("--dns_addr_resolution_override=$0", dns_overrides_)); |
| opts_.extra_tserver_flags.emplace_back( |
| Substitute("--dns_addr_resolution_override=$0", dns_overrides_)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status ExternalMiniCluster::AddTimeSourceFlags( |
| int idx, std::vector<std::string>* flags) { |
| DCHECK_LE(0, idx); |
| DCHECK(flags); |
| #if defined(NO_CHRONY) |
| flags->emplace_back("--time_source=system_unsync"); |
| #else |
| CHECK_LE(0, opts_.num_ntp_servers); |
| if (opts_.num_ntp_servers == 0) { |
| flags->emplace_back("--time_source=system_unsync"); |
| } else { |
| vector<string> ntp_endpoints; |
| CHECK_EQ(opts_.num_ntp_servers, ntp_servers_.size()); |
| // Point the built-in NTP client to the test NTP servers. |
| switch (opts_.ntp_config_mode) { |
| case BuiltinNtpConfigMode::ALL_SERVERS: |
| for (const auto& server : ntp_servers_) { |
| ntp_endpoints.emplace_back(server->address().ToString()); |
| } |
| break; |
| case BuiltinNtpConfigMode::ROUND_ROBIN_SINGLE_SERVER: |
| ntp_endpoints.emplace_back( |
| ntp_servers_[idx % opts_.num_ntp_servers]->address().ToString()); |
| break; |
| } |
| flags->emplace_back(Substitute("--builtin_ntp_servers=$0", |
| JoinStrings(ntp_endpoints, ","))); |
| // The chronyd server supports very short polling interval: let's use this |
| // feature for faster clock synchronisation at startup and to keep the |
| // estimated clock error of the built-in NTP client smaller. |
| flags->emplace_back(Substitute("--builtin_ntp_poll_interval_ms=100")); |
| // Wait up to 10 seconds to let the built-in NTP client to synchronize its |
| // time with the test NTP server. |
| flags->emplace_back(Substitute("--ntp_initial_sync_wait_secs=10")); |
| // Switch the clock to use the built-in NTP client which clock is |
| // synchronized with the test NTP server. |
| flags->emplace_back("--time_source=builtin"); |
| } |
| #endif // #if defined(NO_CHRONY) ... else ... |
| return Status::OK(); |
| } |
| |
| Status ExternalMiniCluster::Start() { |
| CHECK(masters_.empty()) << "Masters are not empty (size: " << masters_.size() |
| << "). Maybe you meant Restart()?"; |
| CHECK(tablet_servers_.empty()) << "Tablet servers are not empty (size: " |
| << tablet_servers_.size() << "). Maybe you meant Restart()?"; |
| RETURN_NOT_OK(HandleOptions()); |
| gflags::FlagSaver saver; |
| FLAGS_dns_addr_resolution_override = dns_overrides_; |
| |
| RETURN_NOT_OK_PREPEND( |
| rpc::MessengerBuilder("minicluster-messenger") |
| .set_num_reactors(1) |
| .set_max_negotiation_threads(1) |
| .set_rpc_negotiation_timeout_ms(opts_.rpc_negotiation_timeout.ToMilliseconds()) |
| .set_sasl_proto_name(opts_.principal) |
| .Build(&messenger_), |
| "Failed to start Messenger for minicluster"); |
| |
| Status s = env()->CreateDir(opts_.cluster_root); |
| if (!s.ok() && !s.IsAlreadyPresent()) { |
| RETURN_NOT_OK_PREPEND(s, "Could not create root dir " + opts_.cluster_root); |
| } |
| |
| if (opts_.enable_kerberos) { |
| kdc_.reset(new MiniKdc(opts_.mini_kdc_options)); |
| RETURN_NOT_OK(kdc_->Start()); |
| RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-admin"), |
| "could not create admin principal"); |
| RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("test-user"), |
| "could not create user principal"); |
| RETURN_NOT_OK_PREPEND(kdc_->CreateUserPrincipal("joe-interloper"), |
| "could not create unauthorized principal"); |
| |
| RETURN_NOT_OK_PREPEND(kdc_->CreateKeytabForExistingPrincipal("test-user"), |
| "could not create client keytab"); |
| |
| RETURN_NOT_OK_PREPEND(kdc_->Kinit("test-admin"), |
| "could not kinit as admin"); |
| |
| RETURN_NOT_OK_PREPEND(kdc_->SetKrb5Environment(), |
| "could not set krb5 client env"); |
| } |
| |
| #if !defined(NO_CHRONY) |
| // Start NTP servers, if requested. |
| if (opts_.num_ntp_servers > 0) { |
| // Collect and keep alive the set of sockets bound with SO_REUSEPORT option |
| // until all chronyd proccesses are started. This allows to avoid port |
| // conflicts: chronyd doesn't support binding to wildcard addresses and |
| // it's necessary to make sure chronyd is able to bind to the port specified |
| // in its configuration. So, the mini-cluster reserves a set of ports up |
| // front, then starts the set of chronyd processes, each bound to one |
| // of the reserved ports. |
| vector<unique_ptr<Socket>> reserved_sockets; |
| for (auto i = 0; i < opts_.num_ntp_servers; ++i) { |
| unique_ptr<Socket> reserved_socket; |
| RETURN_NOT_OK_PREPEND(ReserveDaemonSocket( |
| DaemonType::EXTERNAL_SERVER, i, opts_.bind_mode, &reserved_socket), |
| "failed to reserve chronyd socket address"); |
| Sockaddr addr; |
| RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr)); |
| reserved_sockets.emplace_back(std::move(reserved_socket)); |
| |
| RETURN_NOT_OK_PREPEND(AddNtpServer(addr), |
| Substitute("failed to start NTP server $0", i)); |
| } |
| } |
| #endif // #if !defined(NO_CHRONY) ... |
| |
| if (opts_.enable_ranger || opts_.enable_ranger_kms) { |
| if (!postgres_ || !postgres_->IsStarted()) { |
| postgres_.reset(new postgres::MiniPostgres(cluster_root(), GetBindIpForExternalServer(0))); |
| } |
| string host = GetBindIpForExternalServer(0); |
| ranger_.reset(new ranger::MiniRanger(cluster_root(), host, postgres_)); |
| if (opts_.enable_kerberos) { |
| |
| // The SPNs match the ones defined in mini_ranger_configs.h. |
| string admin_keytab; |
| RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab( |
| Substitute("rangeradmin/$0@KRBTEST.COM", host), |
| &admin_keytab), |
| "could not create rangeradmin keytab"); |
| |
| string lookup_keytab; |
| RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab( |
| Substitute("rangerlookup/$0@KRBTEST.COM", host), |
| &lookup_keytab), |
| "could not create rangerlookup keytab"); |
| |
| string spnego_keytab; |
| RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab( |
| Substitute("HTTP/$0@KRBTEST.COM", host), |
| &spnego_keytab), |
| "could not create ranger HTTP keytab"); |
| |
| ranger_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], admin_keytab, |
| lookup_keytab, spnego_keytab); |
| } |
| |
| RETURN_NOT_OK_PREPEND(ranger_->Start(), "Failed to start the Ranger service"); |
| RETURN_NOT_OK_PREPEND(ranger_->CreateClientConfig(JoinPathSegments(cluster_root(), |
| "ranger-client")), |
| "Failed to write Ranger client config"); |
| } |
| |
| if (opts_.enable_ranger_kms) { |
| string host = GetBindIpForExternalServer(0); |
| ranger_kms_.reset(new rangerkms::MiniRangerKMS(cluster_root(), host, postgres_, ranger_)); |
| if (opts_.enable_kerberos) { |
| string keytab; |
| RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab( |
| Substitute("rangerkms/$0@KRBTEST.COM", host), |
| &keytab), |
| "could not create ranger kms keytab"); |
| string spnego_keytab; |
| RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab( |
| Substitute("HTTP/$0@KRBTEST.COM", host), |
| &spnego_keytab), |
| "could not create ranger kms keytab"); |
| |
| ranger_kms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], keytab, spnego_keytab); |
| } |
| RETURN_NOT_OK(kdc_->CreateUserPrincipal("keyadmin")); |
| RETURN_NOT_OK(kdc_->Kinit("keyadmin")); |
| RETURN_NOT_OK_PREPEND(ranger_kms_->Start(), "Failed to start the Ranger KMS service"); |
| RETURN_NOT_OK_PREPEND(ranger_kms_->CreateClusterKey(opts_.ranger_cluster_key, |
| &opts_.ranger_cluster_key_version), |
| "Failed to create cluster key");; |
| RETURN_NOT_OK(kdc_->Kinit("test-admin")); |
| } |
| |
| // Start the HMS. |
| if (opts_.hms_mode == HmsMode::DISABLE_HIVE_METASTORE || |
| opts_.hms_mode == HmsMode::ENABLE_HIVE_METASTORE || |
| opts_.hms_mode == HmsMode::ENABLE_METASTORE_INTEGRATION) { |
| hms_.reset(new hms::MiniHms()); |
| hms_->SetDataRoot(opts_.cluster_root); |
| |
| if (opts_.hms_mode == HmsMode::DISABLE_HIVE_METASTORE) { |
| hms_->EnableKuduPlugin(false); |
| } |
| |
| if (opts_.enable_kerberos) { |
| string spn = Substitute("hive/$0", hms_->address().host()); |
| string ktpath; |
| RETURN_NOT_OK_PREPEND(kdc_->CreateServiceKeytab(spn, &ktpath), |
| "could not create keytab"); |
| hms_->EnableKerberos(kdc_->GetEnvVars()["KRB5_CONFIG"], spn, ktpath, |
| rpc::SaslProtection::kAuthentication); |
| |
| // Set the protocol name in the environment so that the KuduMetastorePlugin |
| // can communicate with Kudu when a custom name is used. |
| hms_->AddEnvVar("KUDU_SASL_PROTOCOL_NAME", opts_.principal); |
| } |
| |
| RETURN_NOT_OK_PREPEND(hms_->Start(), |
| "Failed to start the Hive Metastore"); |
| } |
| |
| RETURN_NOT_OK_PREPEND(StartMasters(), "failed to start masters"); |
| |
| for (int i = 1; i <= opts_.num_tablet_servers; i++) { |
| RETURN_NOT_OK_PREPEND(AddTabletServer(), Substitute("failed to start tablet server $0", i)); |
| } |
| RETURN_NOT_OK(WaitForTabletServerCount( |
| opts_.num_tablet_servers, |
| MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds))); |
| |
| return Status::OK(); |
| } |
| |
| void ExternalMiniCluster::ShutdownNodes(ClusterNodes nodes) { |
| if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::TS_ONLY) { |
| for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) { |
| ts->Shutdown(); |
| } |
| } |
| if (nodes == ClusterNodes::ALL || nodes == ClusterNodes::MASTERS_ONLY) { |
| for (const scoped_refptr<ExternalMaster>& master : masters_) { |
| if (master) { |
| master->Shutdown(); |
| } |
| } |
| } |
| } |
| |
| Status ExternalMiniCluster::Restart() { |
| for (const scoped_refptr<ExternalMaster>& master : masters_) { |
| if (master && master->IsShutdown()) { |
| if (opts_.hms_mode == HmsMode::ENABLE_METASTORE_INTEGRATION) { |
| master->SetMetastoreIntegration(hms_->uris(), opts_.enable_kerberos); |
| } |
| RETURN_NOT_OK_PREPEND(master->Restart(), "Cannot restart master bound at: " + |
| master->bound_rpc_hostport().ToString()); |
| } |
| } |
| |
| for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) { |
| if (ts->IsShutdown()) { |
| RETURN_NOT_OK_PREPEND(ts->Restart(), "Cannot restart tablet server bound at: " + |
| ts->bound_rpc_hostport().ToString()); |
| } |
| } |
| |
| RETURN_NOT_OK(WaitForTabletServerCount( |
| tablet_servers_.size(), |
| MonoDelta::FromSeconds(kTabletServerRegistrationTimeoutSeconds))); |
| |
| return Status::OK(); |
| } |
| |
| void ExternalMiniCluster::EnableMetastoreIntegration() { |
| opts_.hms_mode = HmsMode::ENABLE_METASTORE_INTEGRATION; |
| } |
| |
| void ExternalMiniCluster::DisableMetastoreIntegration() { |
| for (const auto& master : masters_) { |
| CHECK(master->IsShutdown()) << "Call Shutdown() before changing the HMS mode"; |
| master->mutable_flags()->erase( |
| std::remove_if( |
| master->mutable_flags()->begin(), master->mutable_flags()->end(), |
| [] (const string& flag) { |
| return StringPiece(flag).starts_with("--hive_metastore"); |
| }), |
| master->mutable_flags()->end()); |
| } |
| opts_.hms_mode = HmsMode::ENABLE_HIVE_METASTORE; |
| } |
| |
| void ExternalMiniCluster::SetDaemonBinPath(string daemon_bin_path) { |
| opts_.daemon_bin_path = std::move(daemon_bin_path); |
| for (auto& master : masters_) { |
| master->SetExePath(GetBinaryPath(kKuduBinaryName)); |
| } |
| for (auto& ts : tablet_servers_) { |
| ts->SetExePath(GetBinaryPath(kKuduBinaryName)); |
| } |
| } |
| |
| string ExternalMiniCluster::GetBinaryPath(const string& binary) const { |
| CHECK(!opts_.daemon_bin_path.empty()); |
| return JoinPathSegments(opts_.daemon_bin_path, binary); |
| } |
| |
| string ExternalMiniCluster::GetLogPath(const string& daemon_id) const { |
| CHECK(!opts_.cluster_root.empty()); |
| return JoinPathSegments(JoinPathSegments(opts_.cluster_root, daemon_id), "logs"); |
| } |
| |
| string ExternalMiniCluster::GetDataPath(const string& daemon_id, |
| std::optional<uint32_t> dir_index) const { |
| CHECK(!opts_.cluster_root.empty()); |
| string data_path = "data"; |
| if (dir_index) { |
| CHECK_LT(*dir_index, opts_.num_data_dirs); |
| data_path = Substitute("$0-$1", data_path, *dir_index); |
| } else { |
| CHECK_EQ(1, opts_.num_data_dirs); |
| } |
| return JoinPathSegments(JoinPathSegments(opts_.cluster_root, daemon_id), data_path); |
| } |
| |
| vector<string> ExternalMiniCluster::GetDataPaths(const string& daemon_id) const { |
| if (opts_.num_data_dirs == 1) { |
| return { GetDataPath(daemon_id) }; |
| } |
| vector<string> paths; |
| for (uint32_t dir_index = 0; dir_index < opts_.num_data_dirs; dir_index++) { |
| paths.emplace_back(GetDataPath(daemon_id, dir_index)); |
| } |
| return paths; |
| } |
| |
| string ExternalMiniCluster::GetWalPath(const string& daemon_id) const { |
| CHECK(!opts_.cluster_root.empty()); |
| return JoinPathSegments(JoinPathSegments(opts_.cluster_root, daemon_id), "wal"); |
| } |
| |
| namespace { |
| vector<string> SubstituteInFlags(const vector<string>& orig_flags, int index) { |
| string str_index = strings::Substitute("$0", index); |
| vector<string> ret; |
| for (const string& orig : orig_flags) { |
| ret.push_back(StringReplace(orig, "${index}", str_index, true)); |
| } |
| return ret; |
| } |
| } // anonymous namespace |
| |
| Status ExternalMiniCluster::StartMasters() { |
| int num_masters = opts_.num_masters; |
| |
| // Collect and keep alive the set of master sockets bound with SO_REUSEPORT |
| // until all master proccesses are started. This allows the mini-cluster to |
| // reserve a set of ports up front, then later start the set of masters, each |
| // configured with the full set of ports. |
| // |
| // TODO(dan): re-bind the ports between node restarts in order to prevent other |
| // processess from binding to them in the interim. |
| vector<unique_ptr<Socket>> reserved_sockets; |
| vector<HostPort> master_rpc_addrs; |
| |
| if (!opts_.master_rpc_addresses.empty()) { |
| CHECK_EQ(opts_.master_rpc_addresses.size(), num_masters); |
| master_rpc_addrs = opts_.master_rpc_addresses; |
| } else { |
| for (int i = 0; i < num_masters; i++) { |
| unique_ptr<Socket> reserved_socket; |
| RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(DaemonType::MASTER, i, opts_.bind_mode, |
| &reserved_socket), |
| "failed to reserve master socket address"); |
| Sockaddr addr; |
| RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr)); |
| master_rpc_addrs.emplace_back( |
| opts_.master_alias_prefix.empty() ? |
| addr.host() : Substitute("$0.$1", opts_.master_alias_prefix, i), |
| addr.port()); |
| reserved_sockets.emplace_back(std::move(reserved_socket)); |
| } |
| } |
| // Start the masters. |
| for (int i = 0; i < num_masters; i++) { |
| scoped_refptr<ExternalMaster> peer; |
| RETURN_NOT_OK(CreateMaster(master_rpc_addrs, i, &peer)); |
| RETURN_NOT_OK_PREPEND(peer->Start(), Substitute("Unable to start Master at index $0", i)); |
| RETURN_NOT_OK(peer->SetServerKey()); |
| masters_.emplace_back(std::move(peer)); |
| } |
| return Status::OK(); |
| } |
| |
| string ExternalMiniCluster::GetBindIpForTabletServer(int index) const { |
| return MiniCluster::GetBindIpForDaemonWithType(MiniCluster::TSERVER, index, |
| opts_.bind_mode); |
| } |
| |
| string ExternalMiniCluster::GetBindIpForMaster(int index) const { |
| return MiniCluster::GetBindIpForDaemonWithType(MiniCluster::MASTER, index, |
| opts_.bind_mode); |
| } |
| |
| string ExternalMiniCluster::GetBindIpForExternalServer(int index) const { |
| return MiniCluster::GetBindIpForDaemonWithType(MiniCluster::EXTERNAL_SERVER, |
| index, opts_.bind_mode); |
| } |
| |
| Status ExternalMiniCluster::AddTabletServer() { |
| CHECK(leader_master() != nullptr) |
| << "Must have started at least 1 master before adding tablet servers"; |
| |
| const int idx = tablet_servers_.size(); |
| const string daemon_id = Substitute("ts-$0", idx); |
| const string bind_host = GetBindIpForTabletServer(idx); |
| |
| ExternalDaemonOptions opts; |
| opts.messenger = messenger_; |
| opts.enable_encryption = opts_.enable_encryption; |
| opts.enable_ranger_kms = opts_.enable_ranger_kms; |
| opts.ranger_cluster_key = opts_.ranger_cluster_key; |
| if (opts.enable_ranger_kms) { |
| opts.ranger_kms_url = ranger_kms_->url(); |
| } |
| opts.block_manager_type = opts_.block_manager_type; |
| opts.exe = GetBinaryPath(kKuduBinaryName); |
| opts.wal_dir = GetWalPath(daemon_id); |
| opts.data_dirs = GetDataPaths(daemon_id); |
| opts.log_dir = GetLogPath(daemon_id); |
| if (FLAGS_perf_record) { |
| opts.perf_record_filename = |
| Substitute("$0/perf-$1.data", opts.log_dir, daemon_id); |
| } |
| vector<string> extra_flags; |
| RETURN_NOT_OK(AddTimeSourceFlags(idx, &extra_flags)); |
| auto flags = SubstituteInFlags(opts_.extra_tserver_flags, idx); |
| copy(flags.begin(), flags.end(), std::back_inserter(extra_flags)); |
| opts.extra_flags = extra_flags; |
| if (!opts_.tserver_alias_prefix.empty()) { |
| opts.extra_flags.emplace_back( |
| Substitute("--host_for_tests=$0.$1", |
| opts_.tserver_alias_prefix, tablet_servers_.size())); |
| } |
| opts.start_process_timeout = opts_.start_process_timeout; |
| opts.rpc_bind_address = HostPort(bind_host, 0); |
| opts.logtostderr = opts_.logtostderr; |
| |
| vector<HostPort> master_hostports = master_rpc_addrs(); |
| scoped_refptr<ExternalTabletServer> ts = new ExternalTabletServer(opts, master_hostports); |
| if (opts_.enable_kerberos) { |
| RETURN_NOT_OK_PREPEND(ts->EnableKerberos(kdc_.get(), opts_.principal, bind_host), |
| "could not enable Kerberos"); |
| } |
| |
| RETURN_NOT_OK(ts->Start()); |
| RETURN_NOT_OK(ts->SetServerKey()); |
| tablet_servers_.push_back(ts); |
| return Status::OK(); |
| } |
| |
| Status ExternalMiniCluster::CreateMaster(const vector<HostPort>& master_rpc_addrs, int idx, |
| scoped_refptr<ExternalMaster>* master) { |
| DCHECK_LT(idx, master_rpc_addrs.size()); |
| vector<string> flags; |
| // We expect that callers have reserved a socket for the master we're |
| // creating, and we'll thus have to have the daemon reuse the bound port. |
| flags.emplace_back("--rpc_reuseport=true"); |
| // Setting --master_addresses flag for a single master configuration is now |
| // supported but not mandatory. Not setting the flag helps test existing kudu |
| // deployments that don't specify the --master_addresses flag for single |
| // master configuration. |
| if (master_rpc_addrs.size() > 1 || opts_.supply_single_master_addr) { |
| flags.emplace_back(Substitute("--master_addresses=$0", |
| HostPort::ToCommaSeparatedString(master_rpc_addrs))); |
| } |
| if (!opts_.location_info.empty()) { |
| string bin_path; |
| RETURN_NOT_OK(DeduceBinRoot(&bin_path)); |
| const auto mapping_script_path = |
| JoinPathSegments(bin_path, "testdata/assign-location.py"); |
| const auto state_store_fpath = |
| JoinPathSegments(opts_.cluster_root, "location-assignment.state"); |
| auto location_cmd = Substitute("$0 --state_store=$1", |
| mapping_script_path, state_store_fpath); |
| for (const auto& elem : opts_.location_info) { |
| // Per-location mapping rule specified as a pair 'location:num_servers', |
| // where 'location' is the location string and 'num_servers' is the number |
| // of tablet servers to be assigned the location. |
| location_cmd += Substitute(" --map $0:$1", elem.first, elem.second); |
| } |
| flags.emplace_back(Substitute("--location_mapping_cmd=$0", location_cmd)); |
| # if defined(__APPLE__) |
| // On macOS, it's not possible to have unique loopback interfaces. To make |
| // location mapping working, a tablet server is identified by its UUID |
| // instead of IP address of its RPC end-point. |
| flags.emplace_back("--location_mapping_by_uuid"); |
| # endif |
| } |
| if (opts_.hms_mode == HmsMode::ENABLE_METASTORE_INTEGRATION) { |
| flags.emplace_back(Substitute("--hive_metastore_uris=$0", hms_->uris())); |
| if (opts_.enable_kerberos) { |
| flags.emplace_back("--hive_metastore_sasl_enabled=true"); |
| } |
| } |
| if (opts_.enable_ranger) { |
| flags.emplace_back(Substitute("--ranger_config_path=$0", |
| JoinPathSegments(cluster_root(), |
| "ranger-client"))); |
| flags.emplace_back("--trusted_user_acl=test-admin"); |
| } |
| if (!opts_.master_alias_prefix.empty()) { |
| flags.emplace_back(Substitute("--host_for_tests=$0.$1", |
| opts_.master_alias_prefix, idx)); |
| } |
| |
| if (opts_.enable_encryption) { |
| flags.emplace_back("--encrypt_data_at_rest=true"); |
| if (opts_.enable_ranger_kms) { |
| flags.emplace_back("--encryption_key_provider=ranger-kms"); |
| flags.emplace_back(Substitute("--encryption_cluster_key_name=$0", opts_.ranger_cluster_key)); |
| flags.emplace_back(Substitute("--ranger_kms_url=$0", ranger_kms_->url())); |
| } |
| } |
| // Add custom master flags. |
| copy(opts_.extra_master_flags.begin(), opts_.extra_master_flags.end(), |
| std::back_inserter(flags)); |
| |
| string daemon_id = Substitute("master-$0", idx); |
| |
| ExternalDaemonOptions opts; |
| opts.messenger = messenger_; |
| opts.block_manager_type = opts_.block_manager_type; |
| opts.exe = GetBinaryPath(kKuduBinaryName); |
| opts.wal_dir = GetWalPath(daemon_id); |
| opts.data_dirs = GetDataPaths(daemon_id); |
| opts.log_dir = GetLogPath(daemon_id); |
| if (FLAGS_perf_record) { |
| opts.perf_record_filename = |
| Substitute("$0/perf-$1.data", opts.log_dir, daemon_id); |
| } |
| |
| vector<string> time_source_flags; |
| RETURN_NOT_OK(AddTimeSourceFlags(idx, &time_source_flags)); |
| // Custom flags set above come last because they can contain overrides. |
| flags.insert(flags.begin(), time_source_flags.begin(), time_source_flags.end()); |
| |
| opts.extra_flags = SubstituteInFlags(flags, idx); |
| opts.start_process_timeout = opts_.start_process_timeout; |
| opts.rpc_bind_address = master_rpc_addrs[idx]; |
| opts.logtostderr = opts_.logtostderr; |
| |
| scoped_refptr<ExternalMaster> peer(new ExternalMaster(opts)); |
| if (opts_.enable_kerberos) { |
| RETURN_NOT_OK_PREPEND( |
| peer->EnableKerberos(kdc_.get(), opts_.principal, master_rpc_addrs[idx].host()), |
| "could not enable Kerberos"); |
| } |
| *master = std::move(peer); |
| return Status::OK(); |
| } |
| |
| Status ExternalMiniCluster::AddMaster(const vector<string>& extra_flags) { |
| const int idx = masters_.size(); |
| const string daemon_id = Substitute("master-$0", idx); |
| |
| unique_ptr<Socket> reserved_socket; |
| RETURN_NOT_OK_PREPEND(ReserveDaemonSocket(DaemonType::MASTER, idx, opts_.bind_mode, |
| &reserved_socket), |
| "failed to reserve master socket address"); |
| Sockaddr addr; |
| RETURN_NOT_OK(reserved_socket->GetSocketAddress(&addr)); |
| vector<HostPort> master_rpc_addrs = this->master_rpc_addrs(); |
| master_rpc_addrs.emplace_back(addr.host(), addr.port()); |
| scoped_refptr<ExternalMaster> peer; |
| RETURN_NOT_OK(CreateMaster(master_rpc_addrs, idx, &peer)); |
| peer->mutable_flags()->insert(peer->mutable_flags()->end(), |
| extra_flags.begin(), extra_flags.end()); |
| RETURN_NOT_OK_PREPEND(peer->Start(), |
| Substitute("unable to start master at index $0", idx)); |
| // Update the existing servers' gflags so the new master is accounted for the |
| // next time they restart. |
| // NOTE: the new master already has the correct list set for this flag, from |
| // the call to CreateMaster(). |
| const auto& new_master_addrs_list = HostPort::ToCommaSeparatedString(master_rpc_addrs); |
| for (auto& master : masters_) { |
| master->mutable_flags()->emplace_back( |
| Substitute("--master_addresses=$0", new_master_addrs_list)); |
| } |
| for (auto& ts : tablet_servers_) { |
| ts->mutable_flags()->emplace_back( |
| Substitute("--tserver_master_addrs=$0", new_master_addrs_list)); |
| } |
| masters_.emplace_back(std::move(peer)); |
| ++opts_.num_masters; |
| return Status::OK(); |
| } |
| |
| #if !defined(NO_CHRONY) |
| Status ExternalMiniCluster::AddNtpServer(const Sockaddr& addr) { |
| clock::MiniChronydOptions options; |
| options.index = ntp_servers_.size(); |
| options.data_root = JoinPathSegments(cluster_root(), |
| Substitute("chrony.$0", options.index)); |
| options.bindaddress = addr.host(); |
| options.port = static_cast<uint16_t>(addr.port()); |
| unique_ptr<MiniChronyd> chrony(new MiniChronyd(std::move(options))); |
| RETURN_NOT_OK(chrony->Start()); |
| ntp_servers_.emplace_back(std::move(chrony)); |
| return Status::OK(); |
| } |
| #endif // #if !defined(NO_CHRONY) ... |
| |
| Status ExternalMiniCluster::WaitForTabletServerCount(int count, const MonoDelta& timeout, |
| int master_idx) { |
| CHECK_LT(master_idx, opts_.num_masters); |
| MonoTime deadline = MonoTime::Now() + timeout; |
| |
| unordered_set<int> masters_to_search; |
| if (master_idx == -1) { |
| for (int i = 0; i < masters_.size(); i++) { |
| if (!masters_[i]->IsShutdown()) { |
| masters_to_search.insert(i); |
| } |
| } |
| } else { |
| if (!masters_[master_idx]->IsShutdown()) { |
| masters_to_search.insert(master_idx); |
| } |
| } |
| |
| while (true) { |
| MonoDelta remaining = deadline - MonoTime::Now(); |
| if (remaining.ToSeconds() < 0) { |
| return Status::TimedOut(Substitute( |
| "Timed out waiting for $0 TS(s) to register with all masters", count)); |
| } |
| bool all_masters_reachable = true; |
| for (const auto& master_idx : masters_to_search) { |
| master::PingRequestPB req; |
| master::PingResponsePB resp; |
| rpc::RpcController rpc; |
| rpc.set_timeout(remaining); |
| Status s = master_proxy(master_idx)->Ping(req, &resp, &rpc); |
| if (!s.ok()) { |
| all_masters_reachable = false; |
| break; |
| } |
| } |
| if (!all_masters_reachable) { |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| continue; |
| } |
| |
| for (auto iter = masters_to_search.begin(); iter != masters_to_search.end();) { |
| master::ListTabletServersRequestPB req; |
| master::ListTabletServersResponsePB resp; |
| rpc::RpcController rpc; |
| rpc.set_timeout(remaining); |
| RETURN_NOT_OK_PREPEND(master_proxy(*iter)->ListTabletServers(req, &resp, &rpc), |
| "ListTabletServers RPC failed"); |
| // ListTabletServers() may return servers that are no longer online. |
| // Do a second step of verification to verify that the descs that we got |
| // are aligned (same uuid/seqno) with the TSs that we have in the cluster. |
| int match_count = 0; |
| for (const master::ListTabletServersResponsePB_Entry& e : resp.servers()) { |
| for (const scoped_refptr<ExternalTabletServer>& ets : tablet_servers_) { |
| if (ets->instance_id().permanent_uuid() == e.instance_id().permanent_uuid() && |
| ets->instance_id().instance_seqno() == e.instance_id().instance_seqno()) { |
| match_count++; |
| break; |
| } |
| } |
| } |
| if (match_count == count) { |
| // This master has returned the correct set of tservers. |
| iter = masters_to_search.erase(iter); |
| } else { |
| iter++; |
| } |
| } |
| |
| if (masters_to_search.empty()) { |
| // All masters have returned the correct set of tservers. |
| LOG(INFO) << count << " TS(s) registered with all masters"; |
| return Status::OK(); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| } |
| |
| void ExternalMiniCluster::AssertNoCrashes() { |
| vector<ExternalDaemon*> daemons = this->daemons(); |
| int num_crashes = 0; |
| for (ExternalDaemon* d : daemons) { |
| if (d->IsShutdown()) continue; |
| if (!d->IsProcessAlive()) { |
| LOG(ERROR) << "Process with UUID " << d->uuid() << " has crashed"; |
| num_crashes++; |
| } |
| } |
| ASSERT_EQ(0, num_crashes) << "At least one process crashed"; |
| } |
| |
| Status ExternalMiniCluster::WaitForTabletsRunning( |
| ExternalTabletServer* ts, |
| int min_tablet_count, |
| const MonoDelta& timeout, |
| vector<TabletIdAndTableName>* tablets_info) { |
| TabletServerServiceProxy proxy(messenger_, ts->bound_rpc_addr(), ts->bound_rpc_addr().host()); |
| ListTabletsRequestPB req; |
| ListTabletsResponsePB resp; |
| |
| MonoTime deadline = MonoTime::Now() + timeout; |
| while (MonoTime::Now() < deadline) { |
| rpc::RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromSeconds(10)); |
| RETURN_NOT_OK(proxy.ListTablets(req, &resp, &rpc)); |
| if (resp.has_error()) { |
| return StatusFromPB(resp.error().status()); |
| } |
| |
| bool all_running = true; |
| for (const StatusAndSchemaPB& status : resp.status_and_schema()) { |
| if (status.tablet_status().state() != tablet::RUNNING) { |
| all_running = false; |
| } |
| } |
| |
| // We're done if: |
| // 1. All the tablets are running, and |
| // 2. We've observed as many tablets as we had expected or more. |
| if (all_running && resp.status_and_schema_size() >= min_tablet_count) { |
| if (tablets_info) { |
| tablets_info->clear(); |
| const auto num_elems = resp.status_and_schema_size(); |
| tablets_info->reserve(num_elems); |
| for (auto i = 0; i < num_elems; ++i) { |
| const auto& elem = resp.status_and_schema(i); |
| tablets_info->emplace_back( |
| TabletIdAndTableName{elem.tablet_status().tablet_id(), |
| elem.tablet_status().table_name()}); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| return Status::TimedOut(SecureDebugString(resp)); |
| } |
| |
| Status ExternalMiniCluster::GetLeaderMasterIndex(int* idx) { |
| scoped_refptr<ConnectToClusterRpc> rpc; |
| Synchronizer sync; |
| vector<pair<Sockaddr, string>> addrs_with_names; |
| Sockaddr leader_master_addr; |
| string leader_master_hostname; |
| MonoTime deadline = MonoTime::Now() + MonoDelta::FromSeconds(5); |
| |
| for (const scoped_refptr<ExternalMaster>& master : masters_) { |
| addrs_with_names.emplace_back(master->bound_rpc_addr(), master->bound_rpc_addr().host()); |
| } |
| const auto& cb = [&](const Status& status, |
| const pair<Sockaddr, string>& leader_master, |
| const master::ConnectToMasterResponsePB& /*resp*/) { |
| if (status.ok()) { |
| leader_master_addr = leader_master.first; |
| leader_master_hostname = leader_master.second; |
| } |
| sync.StatusCB(status); |
| }; |
| rpc::UserCredentials user_credentials; |
| RETURN_NOT_OK(user_credentials.SetLoggedInRealUser()); |
| rpc.reset(new ConnectToClusterRpc(cb, |
| std::move(addrs_with_names), |
| deadline, |
| MonoDelta::FromSeconds(5), |
| messenger_, |
| user_credentials)); |
| rpc->SendRpc(); |
| RETURN_NOT_OK(sync.Wait()); |
| bool found = false; |
| for (int i = 0; i < masters_.size(); i++) { |
| const auto& bound_hp = masters_[i]->bound_rpc_hostport(); |
| // If using BindMode::UNIQUE_LOOPBACK mode, in rare cases different masters |
| // might bind to different local IP addresses but use same port numbers. |
| // So, it's necessary to check both the returned hostnames and IP addresses |
| // to point to leader master. |
| if (bound_hp.port() == leader_master_addr.port() && |
| bound_hp.host() == leader_master_hostname) { |
| found = true; |
| *idx = i; |
| break; |
| } |
| } |
| if (!found) { |
| // There is never a situation where this should happen, so it's |
| // better to exit with a FATAL log message right away vs. return a |
| // Status::IllegalState(). |
| LOG(FATAL) << "Leader master is not in masters_"; |
| } |
| return Status::OK(); |
| } |
| |
| ExternalTabletServer* ExternalMiniCluster::tablet_server_by_uuid(const std::string& uuid) const { |
| for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) { |
| if (ts->instance_id().permanent_uuid() == uuid) { |
| return ts.get(); |
| } |
| } |
| return nullptr; |
| } |
| |
| int ExternalMiniCluster::tablet_server_index_by_uuid(const std::string& uuid) const { |
| for (int i = 0; i < tablet_servers_.size(); i++) { |
| if (tablet_servers_[i]->uuid() == uuid) { |
| return i; |
| } |
| } |
| return -1; |
| } |
| |
| vector<ExternalDaemon*> ExternalMiniCluster::daemons() const { |
| vector<ExternalDaemon*> results; |
| for (const scoped_refptr<ExternalTabletServer>& ts : tablet_servers_) { |
| results.push_back(ts.get()); |
| } |
| for (const scoped_refptr<ExternalMaster>& master : masters_) { |
| results.push_back(master.get()); |
| } |
| return results; |
| } |
| |
| #if !defined(NO_CHRONY) |
| vector<MiniChronyd*> ExternalMiniCluster::ntp_servers() const { |
| vector<MiniChronyd*> servers; |
| servers.reserve(ntp_servers_.size()); |
| for (const auto& server : ntp_servers_) { |
| DCHECK(server); |
| servers.emplace_back(server.get()); |
| } |
| return servers; |
| } |
| #endif // #if !defined(NO_CHRONY) ... |
| |
| vector<HostPort> ExternalMiniCluster::master_rpc_addrs() const { |
| vector<HostPort> master_hostports; |
| for (const auto& master : masters_) { |
| master_hostports.emplace_back(master->bound_rpc_hostport()); |
| } |
| return master_hostports; |
| } |
| |
| std::shared_ptr<rpc::Messenger> ExternalMiniCluster::messenger() const { |
| return messenger_; |
| } |
| |
| std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy() const { |
| CHECK_EQ(masters_.size(), 1); |
| return master_proxy(0); |
| } |
| |
| std::shared_ptr<MasterServiceProxy> ExternalMiniCluster::master_proxy(int idx) const { |
| CHECK_LT(idx, masters_.size()); |
| const auto& addr = CHECK_NOTNULL(master(idx))->bound_rpc_addr(); |
| return std::make_shared<MasterServiceProxy>(messenger_, addr, addr.host()); |
| } |
| |
| std::shared_ptr<TabletServerServiceProxy> ExternalMiniCluster::tserver_proxy(int idx) const { |
| CHECK_LT(idx, tablet_servers_.size()); |
| const auto& addr = CHECK_NOTNULL(tablet_server(idx))->bound_rpc_addr(); |
| return std::make_shared<TabletServerServiceProxy>(messenger_, addr, addr.host()); |
| } |
| |
| std::shared_ptr<TabletServerAdminServiceProxy> ExternalMiniCluster::tserver_admin_proxy( |
| int idx) const { |
| CHECK_LT(idx, tablet_servers_.size()); |
| const auto& addr = CHECK_NOTNULL(tablet_server(idx))->bound_rpc_addr(); |
| return std::make_shared<TabletServerAdminServiceProxy>(messenger_, addr, addr.host()); |
| } |
| |
| Status ExternalMiniCluster::CreateClient(client::KuduClientBuilder* builder, |
| client::sp::shared_ptr<client::KuduClient>* client) const { |
| client::KuduClientBuilder defaults; |
| if (builder == nullptr) { |
| builder = &defaults; |
| } |
| |
| CHECK(!masters_.empty()); |
| builder->clear_master_server_addrs(); |
| for (const scoped_refptr<ExternalMaster>& master : masters_) { |
| builder->add_master_server_addr(master->bound_rpc_hostport().ToString()); |
| } |
| builder->sasl_protocol_name(opts_.principal); |
| return builder->Build(client); |
| } |
| |
| Status ExternalMiniCluster::SetFlag(ExternalDaemon* daemon, |
| const string& flag, |
| const string& value) { |
| const auto& addr = daemon->bound_rpc_addr(); |
| server::GenericServiceProxy proxy(messenger_, addr, addr.host()); |
| |
| rpc::RpcController controller; |
| controller.set_timeout(MonoDelta::FromSeconds(30)); |
| server::SetFlagRequestPB req; |
| server::SetFlagResponsePB resp; |
| req.set_flag(flag); |
| req.set_value(value); |
| req.set_force(true); |
| RETURN_NOT_OK_PREPEND(proxy.SetFlag(req, &resp, &controller), |
| "rpc failed"); |
| if (resp.result() != server::SetFlagResponsePB::SUCCESS) { |
| return Status::RemoteError("failed to set flag", |
| SecureShortDebugString(resp)); |
| } |
| return Status::OK(); |
| } |
| |
| string ExternalMiniCluster::WalRootForTS(int ts_idx) const { |
| return tablet_server(ts_idx)->wal_dir(); |
| } |
| |
| string ExternalMiniCluster::UuidForTS(int ts_idx) const { |
| return tablet_server(ts_idx)->uuid(); |
| } |
| |
| Status ExternalMiniCluster::RemoveMaster(const HostPort& hp) { |
| for (auto it = masters_.begin(); it != masters_.end(); ++it) { |
| if ((*it)->bound_rpc_hostport() == hp) { |
| masters_.erase(it); |
| return Status::OK(); |
| } |
| } |
| return Status::NotFound(Substitute("Master $0 not found in ExternalMiniCluster", hp.ToString())); |
| } |
| |
| //------------------------------------------------------------ |
| // ExternalDaemon |
| //------------------------------------------------------------ |
| |
| ExternalDaemon::ExternalDaemon(ExternalDaemonOptions opts) |
| : opts_(std::move(opts)), |
| key_provider_(new DefaultKeyProvider()), |
| parent_tid_(std::this_thread::get_id()) { |
| CHECK(rpc_bind_address().Initialized()); |
| } |
| |
| ExternalDaemon::~ExternalDaemon() { |
| } |
| |
| std::vector<std::string> ExternalDaemon::GetDaemonFlags(const ExternalDaemonOptions& opts) { |
| const string info_path = JoinPathSegments(opts.data_dirs[0], "info.pb"); |
| vector<string> flags = { |
| // Basic flags for a Kudu server. |
| "--fs_wal_dir=" + opts.wal_dir, |
| "--fs_data_dirs=" + JoinStrings(opts.data_dirs, ","), |
| "--block_manager=" + opts.block_manager_type, |
| "--webserver_interface=localhost", |
| |
| // Disable fsync to dramatically speed up runtime. This is safe as no tests |
| // rely on forcefully cutting power to a machine or equivalent. |
| "--never_fsync", |
| |
| // Disable minidumps by default since many tests purposely inject faults. |
| "--enable_minidumps=false", |
| |
| // Disable redaction of the information in logs and Web UI. |
| "--redact=none", |
| |
| // Enable metrics logging. |
| "--metrics_log_interval_ms=1000", |
| |
| // Even if we are logging to stderr, metrics logs and minidumps end up being |
| // written based on -log_dir. So, we have to set that too. |
| "--log_dir=" + opts.log_dir, |
| |
| // Tell the server to dump its port information so we can pick it up. |
| "--server_dump_info_path=" + info_path, |
| "--server_dump_info_format=pb", |
| |
| // We use ephemeral ports in many tests. They don't work for production, |
| // but are OK in unit tests. |
| "--rpc_server_allow_ephemeral_ports", |
| |
| // Allow unsafe and experimental flags from tests, since we often use |
| // fault injection, etc. |
| "--unlock_experimental_flags", |
| "--unlock_unsafe_flags", |
| }; |
| |
| if (opts.logtostderr) { |
| // Ensure that logging goes to the test output and doesn't get buffered. |
| flags.emplace_back("--logtostderr"); |
| flags.emplace_back("--logbuflevel=-1"); |
| } |
| |
| if (opts.enable_encryption) { |
| flags.emplace_back("--encrypt_data_at_rest=true"); |
| if (opts.enable_ranger_kms) { |
| flags.emplace_back("--encryption_key_provider=ranger-kms"); |
| flags.emplace_back(Substitute("--encryption_cluster_key_name=$0", opts.ranger_cluster_key)); |
| flags.emplace_back(Substitute("--ranger_kms_url=$0", opts.ranger_kms_url)); |
| } |
| } |
| |
| // If large keys are not enabled. |
| if (!UseLargeKeys()) { |
| // Generate smaller RSA keys -- generating a 768-bit key is faster |
| // than generating the default 2048-bit key, and we don't care about |
| // strong encryption in tests. Setting it lower (e.g. 512 bits) results |
| // in OpenSSL errors RSA_sign:digest too big for rsa key:rsa_sign.c:122 |
| // since we are using strong/high TLS v1.2 cipher suites, so the minimum |
| // size of TLS-related RSA key is 768 bits (due to the usage of |
| // the ECDHE-RSA-AES256-GCM-SHA384 suite). |
| flags.emplace_back("--ipki_server_key_size=768"); |
| |
| // The RSA key of 768 bits is too short if OpenSSL security level is set to |
| // 1 or higher (applicable for OpenSSL 1.1.0 and newer). Lowering the |
| // security level to 0 makes possible ot use shorter keys in such cases. |
| flags.emplace_back("--openssl_security_level_override=0"); |
| } |
| |
| return flags; |
| } |
| |
| Status ExternalDaemon::StartProcess(const vector<string>& user_flags) { |
| CHECK(!process_); |
| const auto this_tid = std::this_thread::get_id(); |
| CHECK_EQ(parent_tid_, this_tid) |
| << "Process being started from thread " << this_tid << " which is different" |
| << " from the instantiating thread " << parent_tid_; |
| |
| RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), log_dir())); |
| const string info_path = JoinPathSegments(data_dirs()[0], "info.pb"); |
| |
| vector<string> argv = { /* First the exe for argv[0] */ opts_.exe }; |
| vector<string> flags = GetDaemonFlags(opts_); |
| argv.insert(argv.end(), std::make_move_iterator(flags.begin()), |
| std::make_move_iterator(flags.end())); |
| |
| // Add all the flags coming from the minicluster framework. |
| argv.insert(argv.end(), user_flags.begin(), user_flags.end()); |
| |
| // Then the "extra flags" passed into the ctor (from the ExternalMiniCluster |
| // options struct). These come at the end so they can override things like |
| // web port or RPC bind address if necessary. |
| argv.insert(argv.end(), opts_.extra_flags.begin(), opts_.extra_flags.end()); |
| |
| // A previous instance of the daemon may have run in the same directory. So, remove |
| // the previous info file if it's there. |
| ignore_result(Env::Default()->DeleteFile(info_path)); |
| |
| // Start the daemon. |
| unique_ptr<Subprocess> p(new Subprocess(argv)); |
| p->SetEnvVars(extra_env_); |
| string env_str; |
| JoinMapKeysAndValues(extra_env_, "=", ",", &env_str); |
| LOG(INFO) << "Running " << opts_.exe << "\n" << JoinStrings(argv, "\n") |
| << " with env {" << env_str << "}"; |
| RETURN_NOT_OK_PREPEND(p->Start(), |
| Substitute("Failed to start subprocess $0", opts_.exe)); |
| |
| // If requested, start a monitoring subprocess. |
| unique_ptr<Subprocess> perf_record; |
| if (!opts_.perf_record_filename.empty()) { |
| perf_record.reset(new Subprocess({ |
| "perf", |
| "record", |
| "--call-graph", |
| "fp", |
| "-o", |
| opts_.perf_record_filename, |
| Substitute("--pid=$0", p->pid()) |
| }, SIGINT)); |
| RETURN_NOT_OK_PREPEND(perf_record->Start(), |
| "Could not start perf record subprocess"); |
| } |
| |
| // The process is now starting -- wait for the bound port info to show up. |
| Stopwatch sw; |
| sw.start(); |
| bool success = false; |
| while (sw.elapsed().wall_seconds() < opts_.start_process_timeout.ToSeconds()) { |
| if (Env::Default()->FileExists(info_path)) { |
| success = true; |
| break; |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| int wait_status; |
| Status s = p->WaitNoBlock(&wait_status); |
| if (s.IsTimedOut()) { |
| // The process is still running. |
| continue; |
| } |
| |
| // If the process exited with expected exit status we need to still swap() the process |
| // and exit as if it had succeeded. |
| if (WIFEXITED(wait_status) && WEXITSTATUS(wait_status) == fault_injection::kExitStatus) { |
| process_.swap(p); |
| perf_record_process_.swap(perf_record); |
| return Status::OK(); |
| } |
| |
| RETURN_NOT_OK_PREPEND(s, Substitute("Failed waiting on $0", opts_.exe)); |
| string exit_info; |
| RETURN_NOT_OK(p->GetExitStatus(nullptr, &exit_info)); |
| return Status::RuntimeError(exit_info); |
| } |
| |
| if (!success) { |
| ignore_result(p->Kill(SIGKILL)); |
| return Status::TimedOut( |
| Substitute("Timed out after $0 waiting for process ($1) to write info file ($2)", |
| opts_.start_process_timeout.ToString(), opts_.exe, info_path)); |
| } |
| |
| status_.reset(new ServerStatusPB()); |
| RETURN_NOT_OK_PREPEND(pb_util::ReadPBFromPath(Env::Default(), info_path, status_.get()), |
| "Failed to read info file from " + info_path); |
| LOG(INFO) << "Started " << opts_.exe << " as pid " << p->pid(); |
| VLOG(1) << opts_.exe << " instance information:\n" << SecureDebugString(*status_); |
| |
| process_.swap(p); |
| perf_record_process_.swap(perf_record); |
| return Status::OK(); |
| } |
| |
| Env* ExternalDaemon::env() const { |
| return Env::Default(); |
| } |
| |
| Status ExternalDaemon::SetServerKey() { |
| string path = JoinPathSegments(this->wal_dir(), "instance");; |
| LOG(INFO) << "Reading " << path; |
| InstanceMetadataPB instance; |
| RETURN_NOT_OK(pb_util::ReadPBContainerFromPath(env(), path, &instance, pb_util::NOT_SENSITIVE)); |
| if (!instance.server_key().empty()) { |
| string key; |
| RETURN_NOT_OK(key_provider_->DecryptServerKey(instance.server_key(), |
| instance.server_key_iv(), |
| instance.server_key_version(), |
| &key)); |
| LOG(INFO) << "Setting key " << key; |
| env()->SetEncryptionKey(reinterpret_cast<const uint8_t*>(a2b_hex(key).c_str()), key.size() * 4); |
| } |
| return Status::OK(); |
| } |
| |
| void ExternalDaemon::SetExePath(string exe) { |
| CHECK(IsShutdown()) << "Call Shutdown() before changing the executable path"; |
| opts_.exe = std::move(exe); |
| } |
| |
| void ExternalDaemon::SetMetastoreIntegration(const string& hms_uris, |
| bool enable_kerberos) { |
| opts_.extra_flags.emplace_back(Substitute("--hive_metastore_uris=$0", hms_uris)); |
| opts_.extra_flags.emplace_back(Substitute("--hive_metastore_sasl_enabled=$0", enable_kerberos)); |
| } |
| |
| Status ExternalDaemon::CreateKerberosConfig(MiniKdc* kdc, |
| const string& principal_base, |
| const string& bind_host, |
| vector<string>* flags, |
| map<string, string>* env_vars) { |
| string spn = principal_base + "/" + bind_host; |
| string ktpath; |
| RETURN_NOT_OK_PREPEND(kdc->CreateServiceKeytab(spn, &ktpath), |
| "could not create keytab"); |
| *env_vars = kdc->GetEnvVars(); |
| *flags = { |
| Substitute("--keytab_file=$0", ktpath), |
| Substitute("--principal=$0", spn), |
| "--rpc_authentication=required", |
| "--superuser_acl=test-admin", |
| "--user_acl=test-user", |
| }; |
| |
| return Status::OK(); |
| } |
| |
| Status ExternalDaemon::EnableKerberos(MiniKdc* kdc, const string& principal_base, |
| const string& bind_host) { |
| vector<string> flags; |
| RETURN_NOT_OK(CreateKerberosConfig(kdc, principal_base, bind_host, &flags, &extra_env_)); |
| // Insert Kerberos flags at the front of extra_flags, so that user specified |
| // flags will override them. |
| opts_.extra_flags.insert(opts_.extra_flags.begin(), std::make_move_iterator(flags.begin()), |
| std::make_move_iterator(flags.end())); |
| return Status::OK(); |
| } |
| |
| Status ExternalDaemon::Pause() { |
| if (!process_) { |
| return Status::IllegalState(Substitute( |
| "Request to pause '$0' but the process is not there", opts_.exe)); |
| } |
| VLOG(1) << "Pausing " << opts_.exe << " with pid " << process_->pid(); |
| const Status s = process_->Kill(SIGSTOP); |
| RETURN_NOT_OK(s); |
| paused_ = true; |
| return s; |
| } |
| |
| Status ExternalDaemon::Resume() { |
| if (!process_) { |
| return Status::IllegalState(Substitute( |
| "Request to resume '$0' but the process is not there", opts_.exe)); |
| } |
| VLOG(1) << "Resuming " << opts_.exe << " with pid " << process_->pid(); |
| const Status s = process_->Kill(SIGCONT); |
| RETURN_NOT_OK(s); |
| paused_ = false; |
| return s; |
| } |
| |
| bool ExternalDaemon::IsShutdown() const { |
| return !process_; |
| } |
| |
| bool ExternalDaemon::IsProcessAlive() const { |
| if (IsShutdown()) { |
| return false; |
| } |
| Status s = process_->WaitNoBlock(); |
| // If the non-blocking Wait "times out", that means the process |
| // is running. |
| return s.IsTimedOut(); |
| } |
| |
| Status ExternalDaemon::WaitForInjectedCrash(const MonoDelta& timeout) const { |
| return WaitForCrash(timeout, [](int status) { |
| return WIFEXITED(status) && WEXITSTATUS(status) == fault_injection::kExitStatus; |
| }, "fault injection"); |
| } |
| |
| Status ExternalDaemon::WaitForFatal(const MonoDelta& timeout) const { |
| return WaitForCrash(timeout, [](int status) { |
| return WIFSIGNALED(status) && WTERMSIG(status) == SIGABRT; |
| }, "FATAL crash"); |
| } |
| |
| |
| Status ExternalDaemon::WaitForCrash(const MonoDelta& timeout, |
| const std::function<bool(int)>& wait_status_predicate, |
| const char* crash_type_str) const { |
| CHECK(process_) << "process not started"; |
| MonoTime deadline = MonoTime::Now() + timeout; |
| |
| int i = 1; |
| while (IsProcessAlive() && MonoTime::Now() < deadline) { |
| int sleep_ms = std::min(i++ * 10, 200); |
| SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); |
| } |
| |
| if (IsProcessAlive()) { |
| return Status::TimedOut(Substitute("Process did not crash within $0", |
| timeout.ToString())); |
| } |
| |
| // If the process has exited, make sure it exited with the expected status. |
| int wait_status; |
| RETURN_NOT_OK_PREPEND(process_->WaitNoBlock(&wait_status), |
| "could not get wait status"); |
| |
| if (!wait_status_predicate(wait_status)) { |
| string info_str; |
| RETURN_NOT_OK_PREPEND(process_->GetExitStatus(nullptr, &info_str), |
| "could not get description of exit"); |
| return Status::Aborted( |
| Substitute("process exited, but not due to a $0: $1", crash_type_str, info_str)); |
| } |
| return Status::OK(); |
| } |
| |
| pid_t ExternalDaemon::pid() const { |
| return process_->pid(); |
| } |
| |
| Subprocess* ExternalDaemon::process() const { |
| return process_.get(); |
| } |
| |
| void ExternalDaemon::Shutdown() { |
| if (!process_) return; |
| |
| // Before we kill the process, store the addresses. If we're told to |
| // start again we'll reuse these. Store only the port if the |
| // daemons were using wildcard address for binding. |
| if (rpc_bind_address().host() != kWildcardIpAddr) { |
| bound_rpc_ = bound_rpc_hostport(); |
| bound_http_ = bound_http_hostport(); |
| } else { |
| bound_rpc_.set_host(kWildcardIpAddr); |
| bound_rpc_.set_port(bound_rpc_hostport().port()); |
| bound_http_.set_host(kWildcardIpAddr); |
| bound_http_.set_port(bound_http_hostport().port()); |
| } |
| |
| if (IsProcessAlive()) { |
| if (!paused_) { |
| // In coverage builds, ask the process nicely to flush coverage info |
| // before we kill -9 it. Otherwise, we never get any coverage from |
| // external clusters. |
| FlushCoverage(); |
| // Similarly, check for leaks in LSAN builds before killing. |
| CheckForLeaks(); |
| } |
| |
| LOG(INFO) << "Killing " << opts_.exe << " with pid " << process_->pid(); |
| ignore_result(process_->Kill(SIGKILL)); |
| } |
| WARN_NOT_OK(process_->Wait(), "Waiting on " + opts_.exe); |
| paused_ = false; |
| process_.reset(); |
| perf_record_process_.reset(); |
| } |
| |
| Status ExternalDaemon::DeleteFromDisk() const { |
| for (const string& data_dir : data_dirs()) { |
| RETURN_NOT_OK(Env::Default()->DeleteRecursively(data_dir)); |
| } |
| RETURN_NOT_OK(Env::Default()->DeleteRecursively(wal_dir())); |
| return Status::OK(); |
| } |
| |
| void ExternalDaemon::FlushCoverage() { |
| #ifndef COVERAGE_BUILD |
| return; // NOLINT(*) |
| #else |
| LOG(INFO) << "Attempting to flush coverage for " << opts_.exe << " pid " << process_->pid(); |
| server::GenericServiceProxy proxy( |
| opts_.messenger, bound_rpc_addr(), bound_rpc_addr().host()); |
| |
| server::FlushCoverageRequestPB req; |
| server::FlushCoverageResponsePB resp; |
| rpc::RpcController rpc; |
| |
| rpc.set_timeout(MonoDelta::FromMilliseconds(1000)); |
| Status s = proxy.FlushCoverage(req, &resp, &rpc); |
| if (s.ok() && !resp.success()) { |
| s = Status::RemoteError("Server does not appear to be running a coverage build"); |
| } |
| WARN_NOT_OK(s, Substitute("Unable to flush coverage on $0 pid $1", opts_.exe, process_->pid())); |
| #endif |
| } |
| |
| void ExternalDaemon::CheckForLeaks() { |
| #if defined(__has_feature) |
| # if __has_feature(address_sanitizer) |
| LOG(INFO) << "Attempting to check leaks for " << opts_.exe << " pid " << process_->pid(); |
| server::GenericServiceProxy proxy(opts_.messenger, bound_rpc_addr(), bound_rpc_addr().host()); |
| |
| server::CheckLeaksRequestPB req; |
| server::CheckLeaksResponsePB resp; |
| rpc::RpcController rpc; |
| |
| rpc.set_timeout(MonoDelta::FromMilliseconds(1000)); |
| Status s = proxy.CheckLeaks(req, &resp, &rpc); |
| if (s.ok()) { |
| if (!resp.success()) { |
| s = Status::RemoteError("Server does not appear to be running an LSAN build"); |
| } else { |
| CHECK(!resp.found_leaks()) << "Found leaks in " << opts_.exe << " pid " << process_->pid(); |
| } |
| } |
| WARN_NOT_OK(s, Substitute("Unable to check leaks on $0 pid $1", opts_.exe, process_->pid())); |
| # endif |
| #endif |
| } |
| |
| HostPort ExternalDaemon::bound_rpc_hostport() const { |
| CHECK(status_); |
| CHECK_GE(status_->bound_rpc_addresses_size(), 1); |
| return HostPortFromPB(status_->bound_rpc_addresses(0)); |
| } |
| |
| Sockaddr ExternalDaemon::bound_rpc_addr() const { |
| HostPort hp = bound_rpc_hostport(); |
| vector<Sockaddr> addrs; |
| CHECK_OK(hp.ResolveAddresses(&addrs)); |
| CHECK(!addrs.empty()); |
| return addrs[0]; |
| } |
| |
| HostPort ExternalDaemon::bound_http_hostport() const { |
| CHECK(status_); |
| if (status_->bound_http_addresses_size() == 0) { |
| return HostPort(); |
| } |
| return HostPortFromPB(status_->bound_http_addresses(0)); |
| } |
| |
| const NodeInstancePB& ExternalDaemon::instance_id() const { |
| CHECK(status_); |
| return status_->node_instance(); |
| } |
| |
| const string& ExternalDaemon::uuid() const { |
| CHECK(status_); |
| return status_->node_instance().permanent_uuid(); |
| } |
| |
| //------------------------------------------------------------ |
| // ScopedResumeExternalDaemon |
| //------------------------------------------------------------ |
| |
| ScopedResumeExternalDaemon::ScopedResumeExternalDaemon(ExternalDaemon* daemon) |
| : daemon_(CHECK_NOTNULL(daemon)) { |
| } |
| |
| ScopedResumeExternalDaemon::~ScopedResumeExternalDaemon() { |
| WARN_NOT_OK(daemon_->Resume(), "Could not resume external daemon"); |
| } |
| |
| //------------------------------------------------------------ |
| // ExternalMaster |
| //------------------------------------------------------------ |
| |
| ExternalMaster::ExternalMaster(ExternalDaemonOptions opts) |
| : ExternalDaemon(std::move(opts)), |
| env_(Env::NewEnv()) { |
| } |
| |
| ExternalMaster::~ExternalMaster() { |
| } |
| |
| Status ExternalMaster::Start() { |
| vector<string> flags { "master", "run" }; |
| auto common_flags = GetCommonFlags(rpc_bind_address()); |
| flags.insert(flags.end(), std::make_move_iterator(common_flags.begin()), |
| std::make_move_iterator(common_flags.end())); |
| return StartProcess(flags); |
| } |
| |
| Status ExternalMaster::Restart() { |
| // We store the addresses on shutdown so make sure we did that first. |
| if (bound_rpc_.port() == 0) { |
| return Status::IllegalState("Master cannot be restarted. Must call Shutdown() first."); |
| } |
| |
| vector<string> flags { "master", "run" }; |
| auto common_flags = GetCommonFlags(bound_rpc_, bound_http_); |
| flags.insert(flags.end(), std::make_move_iterator(common_flags.begin()), |
| std::make_move_iterator(common_flags.end())); |
| return StartProcess(flags); |
| } |
| |
| Status ExternalMaster::WaitForCatalogManager(WaitMode wait_mode) { |
| unique_ptr<MasterServiceProxy> proxy(new MasterServiceProxy( |
| opts_.messenger, bound_rpc_addr(), bound_rpc_addr().host())); |
| Stopwatch sw; |
| sw.start(); |
| while (sw.elapsed().wall_seconds() < kMasterCatalogManagerTimeoutSeconds) { |
| ListTablesRequestPB req; |
| ListTablesResponsePB resp; |
| RpcController rpc; |
| Status s = proxy->ListTables(req, &resp, &rpc); |
| if (s.IsRemoteError()) { |
| auto* err = rpc.error_response(); |
| if (err && err->has_code()) { |
| switch (err->code()) { |
| case rpc::ErrorStatusPB::ERROR_SERVER_TOO_BUSY: |
| case rpc::ErrorStatusPB::ERROR_UNAVAILABLE: |
| continue; |
| default: |
| return s; |
| } |
| } |
| return s; |
| } |
| if (s.ok()) { |
| if (!resp.has_error()) { |
| // This master is the leader and is up and running. |
| break; |
| } |
| s = StatusFromPB(resp.error().status()); |
| if (s.IsIllegalState()) { |
| if (wait_mode == DONT_WAIT_FOR_LEADERSHIP) { |
| // This master is not the leader but is otherwise up and running. |
| break; |
| } |
| DCHECK_EQ(wait_mode, WAIT_FOR_LEADERSHIP); |
| // Continue to the sleep below. |
| } else if (!s.IsServiceUnavailable()) { |
| // Unexpected error from master. |
| return s; |
| } |
| } else if (!s.IsTimedOut() && !s.IsNetworkError()) { |
| // Unexpected error from proxy. |
| return s; |
| } |
| |
| // There was some kind of transient network error or the master isn't yet |
| // ready. Sleep and retry. |
| SleepFor(MonoDelta::FromMilliseconds(50)); |
| } |
| if (sw.elapsed().wall_seconds() > kMasterCatalogManagerTimeoutSeconds) { |
| return Status::TimedOut( |
| Substitute("Timed out after $0s waiting for master ($1) startup", |
| kMasterCatalogManagerTimeoutSeconds, |
| bound_rpc_addr().ToString())); |
| } |
| return Status::OK(); |
| } |
| |
| vector<string> ExternalMaster::GetCommonFlags(const HostPort& rpc_bind_addr, |
| const HostPort& http_addr) { |
| vector<string> flags; |
| if (!UseLargeKeys()) { |
| // See the in-line comment for "--ipki_server_key_size" flag in |
| // ExternalDaemon::StartProcess() method. |
| flags.emplace_back("--ipki_ca_key_size=768"); |
| // As for the TSK keys, 512 bits is the minimum since we are using |
| // SHA256 digest for token signing/verification. |
| flags.emplace_back("--tsk_num_rsa_bits=512"); |
| } |
| |
| flags.emplace_back(Substitute("--rpc_bind_addresses=$0", rpc_bind_addr.ToString())); |
| |
| if (http_addr.Initialized()) { |
| flags.emplace_back(Substitute("--webserver_interface=$0", http_addr.host())); |
| flags.emplace_back(Substitute("--webserver_port=$0", http_addr.port())); |
| } else { |
| flags.emplace_back(Substitute("--webserver_interface=$0", rpc_bind_addr.host())); |
| flags.emplace_back("--webserver_port=0"); |
| } |
| |
| return flags; |
| } |
| |
| vector<string> ExternalMaster::GetMasterFlags(const ExternalDaemonOptions& opts) { |
| vector<string> flags(ExternalDaemon::GetDaemonFlags(opts)); |
| auto common_flags = GetCommonFlags(opts.rpc_bind_address); |
| flags.insert(flags.end(), std::make_move_iterator(common_flags.begin()), |
| std::make_move_iterator(common_flags.end())); |
| return flags; |
| } |
| |
| //------------------------------------------------------------ |
| // ExternalTabletServer |
| //------------------------------------------------------------ |
| |
| ExternalTabletServer::ExternalTabletServer(ExternalDaemonOptions opts, |
| vector<HostPort> master_addrs) |
| : ExternalDaemon(std::move(opts)), |
| master_addrs_(std::move(master_addrs)), |
| env_(Env::NewEnv()) { |
| DCHECK(!master_addrs_.empty()); |
| } |
| |
| ExternalTabletServer::~ExternalTabletServer() { |
| } |
| |
| Status ExternalTabletServer::Start() { |
| vector<string> flags { |
| "tserver", "run", |
| Substitute("--rpc_bind_addresses=$0", rpc_bind_address().ToString()), |
| Substitute("--local_ip_for_outbound_sockets=$0", rpc_bind_address().host()), |
| Substitute("--webserver_interface=$0", rpc_bind_address().host()), |
| "--webserver_port=0", |
| Substitute("--tserver_master_addrs=$0", |
| HostPort::ToCommaSeparatedString(master_addrs_)), |
| }; |
| return StartProcess(flags); |
| } |
| |
| Status ExternalTabletServer::Restart() { |
| // We store the addresses on shutdown so make sure we did that first. |
| if (bound_rpc_.port() == 0) { |
| return Status::IllegalState("Tablet server cannot be restarted. Must call Shutdown() first."); |
| } |
| vector<string> flags { |
| "tserver", "run", |
| Substitute("--rpc_bind_addresses=$0", bound_rpc_.ToString()), |
| Substitute("--local_ip_for_outbound_sockets=$0", rpc_bind_address().host()), |
| Substitute("--tserver_master_addrs=$0", |
| HostPort::ToCommaSeparatedString(master_addrs_)), |
| }; |
| if (bound_http_.Initialized()) { |
| flags.push_back(Substitute("--webserver_port=$0", bound_http_.port())); |
| flags.push_back(Substitute("--webserver_interface=$0", |
| bound_http_.host())); |
| } |
| return StartProcess(flags); |
| } |
| |
| } // namespace cluster |
| } // namespace kudu |