blob: 11153aa984112e683a8a1b9308d72ebc6d8a2a51 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tools/tool_action.h"
#include <unistd.h>
#include <cstdlib>
#include <map>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/common.h>
#include <google/protobuf/stubs/status.h>
#include <google/protobuf/stubs/stringpiece.h>
#include <google/protobuf/util/json_util.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/security/test/mini_kdc.h"
#include "kudu/subprocess/subprocess_protocol.h"
#include "kudu/tools/tool.pb.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
#include "kudu/util/string_case.h"
DEFINE_string(serialization, "json", "Serialization method to be used by the "
"control shell. Valid values are 'json' (protobuf serialized "
"into JSON and terminated with a newline character) or 'pb' "
"(four byte protobuf message length in big endian followed by "
"the protobuf message itself).");
DEFINE_validator(serialization, [](const char* /*n*/, const std::string& v) {
return kudu::iequals(v, "pb") ||
kudu::iequals(v, "json");
});
using kudu::cluster::ExternalDaemon;
using kudu::cluster::ExternalMiniCluster;
using kudu::cluster::ExternalMiniClusterOptions;
using kudu::subprocess::SubprocessProtocol;
using std::string;
using std::unique_ptr;
using strings::Substitute;
namespace kudu {
namespace tools {
namespace {
Status MakeClusterRoot(string* cluster_root) {
// The ExternalMiniCluster can't generate the cluster root on our behalf because
// we're not running inside a gtest. So we'll use this approach instead,
// which is what the Java external mini cluster used for a long time.
const char* tmpdir = getenv("TEST_TMPDIR");
string tmpdir_str = tmpdir ? tmpdir : Substitute("/tmp/kudutest-$0", getuid());
string root = JoinPathSegments(tmpdir_str, "minicluster-data");
RETURN_NOT_OK(env_util::CreateDirsRecursively(Env::Default(), root));
*cluster_root = root;
return Status::OK();
}
Status CheckClusterExists(const unique_ptr<ExternalMiniCluster>& cluster) {
if (!cluster) {
return Status::NotFound("cluster not found");
}
return Status::OK();
}
Status FindDaemon(const unique_ptr<ExternalMiniCluster>& cluster,
const DaemonIdentifierPB& id,
ExternalDaemon** daemon,
MiniKdc** kdc) {
RETURN_NOT_OK(CheckClusterExists(cluster));
if (!id.has_type()) {
return Status::InvalidArgument("request is missing daemon type");
}
switch (id.type()) {
case MASTER:
if (!id.has_index()) {
return Status::InvalidArgument("request is missing daemon index");
}
if (id.index() >= cluster->num_masters()) {
return Status::NotFound(Substitute("no master with index $0",
id.index()));
}
*daemon = cluster->master(id.index());
*kdc = nullptr;
break;
case TSERVER:
if (!id.has_index()) {
return Status::InvalidArgument("request is missing daemon index");
}
if (id.index() >= cluster->num_tablet_servers()) {
return Status::NotFound(Substitute("no tserver with index $0",
id.index()));
}
*daemon = cluster->tablet_server(id.index());
*kdc = nullptr;
break;
case KDC:
if (!cluster->kdc()) {
return Status::NotFound("kdc not found");
}
*daemon = nullptr;
*kdc = cluster->kdc();
break;
default:
return Status::InvalidArgument(
Substitute("unknown daemon type: $0", DaemonType_Name(id.type())));
}
return Status::OK();
}
Status ProcessRequest(const ControlShellRequestPB& req,
ControlShellResponsePB* resp,
unique_ptr<ExternalMiniCluster>* cluster) {
switch (req.request_case()) {
case ControlShellRequestPB::kCreateCluster:
{
if (*cluster) {
RETURN_NOT_OK(Status::InvalidArgument("cluster already created"));
}
const CreateClusterRequestPB& cc = req.create_cluster();
ExternalMiniClusterOptions opts;
if (cc.has_num_masters()) {
if (cc.num_masters() != 1 && cc.num_masters() != 3) {
RETURN_NOT_OK(Status::InvalidArgument(
"only one or three masters are supported"));
}
opts.num_masters = cc.num_masters();
}
if (cc.has_num_tservers()) {
opts.num_tablet_servers = cc.num_tservers();
}
opts.enable_kerberos = cc.enable_kerberos();
opts.hms_mode = cc.hms_mode();
opts.enable_ranger = cc.enable_ranger();
if (cc.has_cluster_root()) {
opts.cluster_root = cc.cluster_root();
} else {
RETURN_NOT_OK(MakeClusterRoot(&opts.cluster_root));
}
opts.extra_master_flags.assign(cc.extra_master_flags().begin(),
cc.extra_master_flags().end());
opts.extra_tserver_flags.assign(cc.extra_tserver_flags().begin(),
cc.extra_tserver_flags().end());
if (opts.enable_kerberos) {
opts.mini_kdc_options.data_root = JoinPathSegments(opts.cluster_root, "krb5kdc");
opts.mini_kdc_options.ticket_lifetime = cc.mini_kdc_options().ticket_lifetime();
opts.mini_kdc_options.renew_lifetime = cc.mini_kdc_options().renew_lifetime();
if (cc.has_principal()) {
opts.principal = cc.principal();
}
}
cluster->reset(new ExternalMiniCluster(std::move(opts)));
break;
}
case ControlShellRequestPB::kDestroyCluster:
{
RETURN_NOT_OK(CheckClusterExists(*cluster));
cluster->reset();
break;
}
case ControlShellRequestPB::kStartCluster:
{
RETURN_NOT_OK(CheckClusterExists(*cluster));
if ((*cluster)->num_masters() != 0) {
DCHECK_GT((*cluster)->num_tablet_servers(), 0);
RETURN_NOT_OK((*cluster)->Restart());
} else {
RETURN_NOT_OK((*cluster)->Start());
}
break;
}
case ControlShellRequestPB::kStopCluster:
{
RETURN_NOT_OK(CheckClusterExists(*cluster));
(*cluster)->Shutdown();
break;
}
case ControlShellRequestPB::kStartDaemon:
{
if (!req.start_daemon().has_id()) {
RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
}
ExternalDaemon* daemon;
MiniKdc* kdc;
RETURN_NOT_OK(FindDaemon(*cluster, req.start_daemon().id(), &daemon, &kdc));
if (daemon) {
DCHECK(!kdc);
RETURN_NOT_OK(daemon->Restart());
} else {
DCHECK(kdc);
RETURN_NOT_OK(kdc->Start());
}
break;
}
case ControlShellRequestPB::kStopDaemon:
{
if (!req.stop_daemon().has_id()) {
RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
}
ExternalDaemon* daemon;
MiniKdc* kdc;
RETURN_NOT_OK(FindDaemon(*cluster, req.stop_daemon().id(), &daemon, &kdc));
if (daemon) {
DCHECK(!kdc);
daemon->Shutdown();
} else {
DCHECK(kdc);
RETURN_NOT_OK(kdc->Stop());
}
break;
}
case ControlShellRequestPB::kGetMasters:
{
RETURN_NOT_OK(CheckClusterExists(*cluster));
for (int i = 0; i < (*cluster)->num_masters(); i++) {
HostPortPB pb = HostPortToPB((*cluster)->master(i)->bound_rpc_hostport());
DaemonInfoPB* info = resp->mutable_get_masters()->mutable_masters()->Add();
info->mutable_id()->set_type(MASTER);
info->mutable_id()->set_index(i);
*info->mutable_bound_rpc_address() = std::move(pb);
}
break;
}
case ControlShellRequestPB::kGetTservers:
{
RETURN_NOT_OK(CheckClusterExists(*cluster));
for (int i = 0; i < (*cluster)->num_tablet_servers(); i++) {
HostPortPB pb = HostPortToPB((*cluster)->tablet_server(i)->bound_rpc_hostport());
DaemonInfoPB* info = resp->mutable_get_tservers()->mutable_tservers()->Add();
info->mutable_id()->set_type(TSERVER);
info->mutable_id()->set_index(i);
*info->mutable_bound_rpc_address() = std::move(pb);
}
break;
}
case ControlShellRequestPB::kGetKdcEnvVars:
{
if (!(*cluster)->kdc()) {
RETURN_NOT_OK(Status::NotFound("kdc not found"));
}
auto env_vars = (*cluster)->kdc()->GetEnvVars();
resp->mutable_get_kdc_env_vars()->mutable_env_vars()->insert(
env_vars.begin(), env_vars.end());
break;
}
case ControlShellRequestPB::kKdestroy:
{
if (!(*cluster)->kdc()) {
RETURN_NOT_OK(Status::NotFound("kdc not found"));
}
RETURN_NOT_OK((*cluster)->kdc()->Kdestroy());
break;
}
case ControlShellRequestPB::kKinit:
{
if (!(*cluster)->kdc()) {
RETURN_NOT_OK(Status::NotFound("kdc not found"));
}
RETURN_NOT_OK((*cluster)->kdc()->Kinit(req.kinit().username()));
break;
}
case ControlShellRequestPB::kSetDaemonFlag:
{
const auto& r = req.set_daemon_flag();
if (!r.has_id()) {
RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
}
const auto& id = r.id();
if (id.type() == DaemonType::KDC) {
return Status::InvalidArgument("mini-KDC doesn't support SetFlag()");
}
ExternalDaemon* daemon;
MiniKdc* kdc;
RETURN_NOT_OK(FindDaemon(*cluster, id, &daemon, &kdc));
DCHECK(daemon);
RETURN_NOT_OK((*cluster)->SetFlag(daemon, r.flag(), r.value()));
break;
}
case ControlShellRequestPB::kPauseDaemon:
{
const auto& r = req.pause_daemon();
if (!r.has_id()) {
RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
}
const auto& id = r.id();
if (id.type() == DaemonType::KDC) {
return Status::InvalidArgument("mini-KDC doesn't support pausing");
}
ExternalDaemon* daemon;
MiniKdc* kdc;
RETURN_NOT_OK(FindDaemon(*cluster, id, &daemon, &kdc));
DCHECK(daemon);
RETURN_NOT_OK(daemon->Pause());
break;
}
case ControlShellRequestPB::kResumeDaemon:
{
const auto& r = req.resume_daemon();
if (!r.has_id()) {
RETURN_NOT_OK(Status::InvalidArgument("missing process id"));
}
const auto& id = r.id();
if (id.type() == DaemonType::KDC) {
return Status::InvalidArgument("mini-KDC doesn't support resuming");
}
ExternalDaemon* daemon;
MiniKdc* kdc;
RETURN_NOT_OK(FindDaemon(*cluster, id, &daemon, &kdc));
DCHECK(daemon);
RETURN_NOT_OK(daemon->Resume());
break;
}
default:
RETURN_NOT_OK(Status::InvalidArgument("unknown cluster control request"));
}
return Status::OK();
}
Status RunControlShell(const RunnerContext& /*context*/) {
// Set up the protocol.
//
// Because we use stdin and stdout to communicate with the shell's parent,
// it's critical that none of our subprocesses write to stdout. To that end,
// the protocol will use stdout via another fd, and we'll redirect fd 1 to stderr.
int new_stdout;
RETRY_ON_EINTR(new_stdout, dup(STDOUT_FILENO));
CHECK_ERR(new_stdout);
int ret;
RETRY_ON_EINTR(ret, dup2(STDERR_FILENO, STDOUT_FILENO));
PCHECK(ret == STDOUT_FILENO);
SubprocessProtocol::SerializationMode serde_mode;
if (iequals(FLAGS_serialization, "json")) {
serde_mode = SubprocessProtocol::SerializationMode::JSON;
} else {
DCHECK(iequals(FLAGS_serialization, "pb"));
serde_mode = SubprocessProtocol::SerializationMode::PB;
}
SubprocessProtocol protocol(serde_mode,
SubprocessProtocol::CloseMode::NO_CLOSE_ON_DESTROY,
STDIN_FILENO,
new_stdout);
// Run the shell loop, processing each message as it is received.
unique_ptr<ExternalMiniCluster> cluster;
while (true) {
ControlShellRequestPB req;
ControlShellResponsePB resp;
// Receive a new request, blocking until one is received.
//
// IO errors are fatal while others will result in an error response.
Status s = protocol.ReceiveMessage(&req);
if (s.IsEndOfFile()) {
break;
}
if (s.IsIOError()) {
return s;
}
// If we've made it here, we're definitely going to respond.
if (s.ok()) {
// We've successfully received a message. Try to process it.
s = ProcessRequest(req, &resp, &cluster);
}
if (!s.ok()) {
// This may be the result of ReceiveMessage() or ProcessRequest(),
// whichever failed first.
StatusToPB(s, resp.mutable_error());
}
// Send the response. All errors are fatal.
s = protocol.SendMessage(resp);
if (s.IsEndOfFile()) {
break;
}
RETURN_NOT_OK(s);
}
// Normal exit, clean up cluster root.
if (cluster) {
cluster->Shutdown();
WARN_NOT_OK(Env::Default()->DeleteRecursively(cluster->cluster_root()),
"Could not delete cluster root");
}
return Status::OK();
}
string SerializeRequest(const ControlShellRequestPB& req) {
string serialized;
auto google_status = google::protobuf::util::MessageToJsonString(
req, &serialized);
CHECK(google_status.ok()) << Substitute(
"unable to serialize JSON ($0): $1",
google_status.error_message().ToString(), pb_util::SecureDebugString(req));
return serialized;
}
} // anonymous namespace
unique_ptr<Mode> BuildTestMode() {
ControlShellRequestPB create;
create.mutable_create_cluster()->set_num_tservers(3);
ControlShellRequestPB start;
start.mutable_start_cluster();
string extra = Substitute(
"The protocol for the control shell is protobuf-based and is documented "
"in src/kudu/tools/tool.proto. It is currently considered to be highly "
"experimental and subject to change.\n"
"\n"
"Example JSON input to create and start a cluster:\n"
" $0\n"
" $1\n",
SerializeRequest(create),
SerializeRequest(start));
unique_ptr<Action> control_shell =
ActionBuilder("mini_cluster", &RunControlShell)
.Description("Spawn a control shell for running a mini-cluster")
.ExtraDescription(extra)
.AddOptionalParameter("serialization")
.Build();
return ModeBuilder("test")
.Description("Various test actions")
.AddAction(std::move(control_shell))
.Build();
}
} // namespace tools
} // namespace kudu