blob: 200505938b0a97ca780cb5b1be6be8b94a6fa631 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tools/tool_action_common.h"
#include <unistd.h>
#include <algorithm>
#include <cerrno>
#include <cstddef>
#include <iomanip>
#include <iostream>
#include <iterator>
#include <memory>
#include <numeric>
#include <string>
#include <unordered_map>
#include <vector>
#include <boost/algorithm/string/predicate.hpp>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/util/json_util.h>
#include "kudu/client/client-internal.h" // IWYU pragma: keep
#include "kudu/client/client.h"
#include "kudu/client/shared_ptr.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h" // IWYU pragma: keep
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/gutil/endian.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/stringpiece.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/master/master.proxy.h" // IWYU pragma: keep
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tools/tool.pb.h" // IWYU pragma: keep
#include "kudu/tools/tool_action.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h" // IWYU pragma: keep
#include "kudu/tserver/tserver_service.proxy.h" // IWYU pragma: keep
#include "kudu/util/faststring.h"
#include "kudu/util/jsonwriter.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
DEFINE_bool(force, false, "If true, allows the set_flag command to set a flag "
"which is not explicitly marked as runtime-settable. Such flag "
"changes may be simply ignored on the server, or may cause the "
"server to crash.");
DEFINE_bool(print_meta, true, "Include metadata in output");
DEFINE_string(print_entries, "decoded",
"How to print entries:\n"
" false|0|no = don't print\n"
" true|1|yes|decoded = print them decoded\n"
" pb = print the raw protobuf\n"
" id = print only their ids");
DEFINE_string(table_name, "",
"Restrict output to a specific table by name");
DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds");
DEFINE_int32(truncate_data, 100,
"Truncate the data fields to the given number of bytes "
"before printing. Set to 0 to disable");
DEFINE_string(columns, "", "Comma-separated list of column fields to include in output tables");
DEFINE_string(format, "pretty",
"Format to use for printing list output tables.\n"
"Possible values: pretty, space, tsv, csv, and json");
DEFINE_string(flag_tags, "", "Comma-separated list of tags used to restrict which "
"flags are returned. An empty value matches all tags");
DEFINE_bool(all_flags, false, "Whether to return all flags, or only flags that "
"were explicitly set.");
DEFINE_string(tables, "", "Tables to include (comma-separated list of table names)"
"If not specified, includes all tables.");
namespace boost {
template <typename Signature>
class function;
} // namespace boost
namespace kudu {
namespace master {
class ListMastersRequestPB;
class ListMastersResponsePB;
class ListTabletServersRequestPB;
class ListTabletServersResponsePB;
class ReplaceTabletRequestPB;
class ReplaceTabletResponsePB;
} // namespace master
namespace tools {
using client::KuduClient;
using client::KuduClientBuilder;
using client::KuduTablet;
using client::KuduTabletServer;
using consensus::ConsensusServiceProxy;
using consensus::ReplicateMsg;
using log::LogEntryPB;
using log::LogEntryReader;
using log::ReadableLogSegment;
using master::ListMastersRequestPB;
using master::ListMastersResponsePB;
using master::ListTabletServersRequestPB;
using master::ListTabletServersResponsePB;
using master::MasterServiceProxy;
using master::ReplaceTabletRequestPB;
using master::ReplaceTabletResponsePB;
using pb_util::SecureDebugString;
using pb_util::SecureShortDebugString;
using rpc::Messenger;
using rpc::MessengerBuilder;
using rpc::RequestIdPB;
using rpc::RpcController;
using server::GenericServiceProxy;
using server::GetFlagsRequestPB;
using server::GetFlagsResponsePB;
using server::GetStatusRequestPB;
using server::GetStatusResponsePB;
using server::ServerClockRequestPB;
using server::ServerClockResponsePB;
using server::ServerStatusPB;
using server::SetFlagRequestPB;
using server::SetFlagResponsePB;
using std::cout;
using std::endl;
using std::ostream;
using std::setfill;
using std::setw;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
using tserver::TabletServerAdminServiceProxy;
using tserver::TabletServerServiceProxy;
using tserver::WriteRequestPB;
const char* const kMasterAddressesArg = "master_addresses";
const char* const kMasterAddressesArgDesc = "Comma-separated list of Kudu "
"Master addresses where each address is of form 'hostname:port'";
const char* const kTabletIdArg = "tablet_id";
const char* const kTabletIdArgDesc = "Tablet Identifier";
namespace {
enum PrintEntryType {
DONT_PRINT,
PRINT_PB,
PRINT_DECODED,
PRINT_ID
};
PrintEntryType ParsePrintType() {
if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), true) == false) {
return DONT_PRINT;
} else if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), false) == true ||
FLAGS_print_entries == "decoded") {
return PRINT_DECODED;
} else if (FLAGS_print_entries == "pb") {
return PRINT_PB;
} else if (FLAGS_print_entries == "id") {
return PRINT_ID;
} else {
LOG(FATAL) << "Unknown value for --print_entries: " << FLAGS_print_entries;
}
}
void PrintIdOnly(const LogEntryPB& entry) {
switch (entry.type()) {
case log::REPLICATE:
{
cout << entry.replicate().id().term() << "." << entry.replicate().id().index()
<< "@" << entry.replicate().timestamp() << "\t";
cout << "REPLICATE "
<< OperationType_Name(entry.replicate().op_type());
break;
}
case log::COMMIT:
{
cout << "COMMIT " << entry.commit().commited_op_id().term()
<< "." << entry.commit().commited_op_id().index();
break;
}
default:
cout << "UNKNOWN: " << SecureShortDebugString(entry);
}
cout << endl;
}
Status PrintDecodedWriteRequestPB(const string& indent,
const Schema& tablet_schema,
const WriteRequestPB& write,
const RequestIdPB* request_id) {
Schema request_schema;
RETURN_NOT_OK(SchemaFromPB(write.schema(), &request_schema));
Arena arena(32 * 1024);
RowOperationsPBDecoder dec(&write.row_operations(), &request_schema, &tablet_schema, &arena);
vector<DecodedRowOperation> ops;
RETURN_NOT_OK(dec.DecodeOperations(&ops));
cout << indent << "Tablet: " << write.tablet_id() << endl;
cout << indent << "RequestId: "
<< (request_id ? SecureShortDebugString(*request_id) : "None") << endl;
cout << indent << "Consistency: "
<< ExternalConsistencyMode_Name(write.external_consistency_mode()) << endl;
if (write.has_propagated_timestamp()) {
cout << indent << "Propagated TS: " << write.propagated_timestamp() << endl;
}
int i = 0;
for (const DecodedRowOperation& op : ops) {
// TODO (KUDU-515): Handle the case when a tablet's schema changes
// mid-segment.
cout << indent << "op " << (i++) << ": " << op.ToString(tablet_schema) << endl;
}
return Status::OK();
}
Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
PrintIdOnly(entry);
const string indent = "\t";
if (entry.has_replicate()) {
// We can actually decode REPLICATE messages.
const ReplicateMsg& replicate = entry.replicate();
if (replicate.op_type() == consensus::WRITE_OP) {
RETURN_NOT_OK(PrintDecodedWriteRequestPB(
indent,
tablet_schema,
replicate.write_request(),
replicate.has_request_id() ? &replicate.request_id() : nullptr));
} else {
cout << indent << SecureShortDebugString(replicate) << endl;
}
} else if (entry.has_commit()) {
// For COMMIT we'll just dump the PB
cout << indent << SecureShortDebugString(entry.commit()) << endl;
}
return Status::OK();
}
} // anonymous namespace
template<class ProxyClass>
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<ProxyClass>* proxy) {
HostPort hp;
RETURN_NOT_OK(hp.ParseString(address, default_port));
shared_ptr<Messenger> messenger;
RETURN_NOT_OK(MessengerBuilder("tool").Build(&messenger));
vector<Sockaddr> resolved;
RETURN_NOT_OK(hp.ResolveAddresses(&resolved));
proxy->reset(new ProxyClass(messenger, resolved[0], hp.host()));
return Status::OK();
}
// Explicit specialization for callers outside this compilation unit.
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<ConsensusServiceProxy>* proxy);
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<TabletServerServiceProxy>* proxy);
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<TabletServerAdminServiceProxy>* proxy);
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<MasterServiceProxy>* proxy);
Status GetServerStatus(const string& address, uint16_t default_port,
ServerStatusPB* status) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
GetStatusRequestPB req;
GetStatusResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK(proxy->GetStatus(req, &resp, &rpc));
if (!resp.has_status()) {
return Status::Incomplete("Server response did not contain status",
proxy->ToString());
}
*status = resp.status();
return Status::OK();
}
Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) {
PrintEntryType print_type = ParsePrintType();
if (FLAGS_print_meta) {
cout << "Header:\n" << SecureDebugString(segment->header());
}
if (print_type != DONT_PRINT) {
Schema tablet_schema;
RETURN_NOT_OK(SchemaFromPB(segment->header().schema(), &tablet_schema));
LogEntryReader reader(segment.get());
while (true) {
unique_ptr<LogEntryPB> entry;
Status s = reader.ReadNextEntry(&entry);
if (s.IsEndOfFile()) {
break;
}
RETURN_NOT_OK(s);
if (print_type == PRINT_PB) {
if (FLAGS_truncate_data > 0) {
pb_util::TruncateFields(entry.get(), FLAGS_truncate_data);
}
cout << "Entry:\n" << SecureDebugString(*entry);
} else if (print_type == PRINT_DECODED) {
RETURN_NOT_OK(PrintDecoded(*entry, tablet_schema));
} else if (print_type == PRINT_ID) {
PrintIdOnly(*entry);
}
}
}
if (FLAGS_print_meta && segment->HasFooter()) {
cout << "Footer:\n" << SecureDebugString(segment->footer());
}
return Status::OK();
}
Status GetServerFlags(const std::string& address,
uint16_t default_port,
bool all_flags,
const std::string& flag_tags,
std::vector<server::GetFlagsResponsePB_Flag>* flags) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
GetFlagsRequestPB req;
GetFlagsResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
req.set_all_flags(all_flags);
for (StringPiece tag : strings::Split(flag_tags, ",", strings::SkipEmpty())) {
req.add_tags(tag.as_string());
}
RETURN_NOT_OK(proxy->GetFlags(req, &resp, &rpc));
flags->clear();
std::move(resp.flags().begin(), resp.flags().end(), std::back_inserter(*flags));
return Status::OK();
}
Status PrintServerFlags(const string& address, uint16_t default_port) {
vector<server::GetFlagsResponsePB_Flag> flags;
RETURN_NOT_OK(GetServerFlags(address, default_port, FLAGS_all_flags, FLAGS_flag_tags, &flags));
std::sort(flags.begin(), flags.end(),
[](const GetFlagsResponsePB::Flag& left,
const GetFlagsResponsePB::Flag& right) -> bool {
return left.name() < right.name();
});
DataTable table({ "flag", "value", "default value?", "tags" });
vector<string> tags;
for (const auto& flag : flags) {
tags.clear();
std::copy(flag.tags().begin(), flag.tags().end(), std::back_inserter(tags));
std::sort(tags.begin(), tags.end());
table.AddRow({ flag.name(),
flag.value(),
flag.is_default_value() ? "true" : "false",
JoinStrings(tags, ",") });
}
return table.PrintTo(cout);
}
Status SetServerFlag(const string& address, uint16_t default_port,
const string& flag, const string& value) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
SetFlagRequestPB req;
SetFlagResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
req.set_flag(flag);
req.set_value(value);
req.set_force(FLAGS_force);
RETURN_NOT_OK(proxy->SetFlag(req, &resp, &rpc));
switch (resp.result()) {
case server::SetFlagResponsePB::SUCCESS:
return Status::OK();
case server::SetFlagResponsePB::NOT_SAFE:
return Status::RemoteError(resp.msg() +
" (use --force flag to allow anyway)");
default:
return Status::RemoteError(SecureShortDebugString(resp));
}
}
string GetMasterAddresses(const client::KuduClient& client) {
return HostPort::ToCommaSeparatedString(client.data_->master_hostports());
}
bool MatchesAnyPattern(const vector<string>& patterns, const string& str) {
// Consider no filter a wildcard.
if (patterns.empty()) return true;
for (const auto& p : patterns) {
if (MatchPattern(str, p)) return true;
}
return false;
}
Status PrintServerStatus(const string& address, uint16_t default_port) {
ServerStatusPB status;
RETURN_NOT_OK(GetServerStatus(address, default_port, &status));
cout << SecureDebugString(status) << endl;
return Status::OK();
}
Status PrintServerTimestamp(const string& address, uint16_t default_port) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
ServerClockRequestPB req;
ServerClockResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK(proxy->ServerClock(req, &resp, &rpc));
if (!resp.has_timestamp()) {
return Status::Incomplete("Server response did not contain timestamp",
proxy->ToString());
}
cout << resp.timestamp() << endl;
return Status::OK();
}
namespace {
// Pretty print a table using the psql format. For example:
//
// uuid | rpc-addresses | seqno
// ----------------------------------+--------------------------------+------------------
// 335d132897de4bdb9b87443f2c487a42 | 126.rack1.dc1.example.com:7050 | 1492596790237811
// 7425c65d80f54f2da0a85494a5eb3e68 | 122.rack1.dc1.example.com:7050 | 1492596755322350
// dd23284d3a334f1a8306c19d89c1161f | 130.rack1.dc1.example.com:7050 | 1492596704536543
// d8009e07d82b4e66a7ab50f85e60bc30 | 136.rack1.dc1.example.com:7050 | 1492596696557549
// c108a85a68504c2bb9f49e4ee683d981 | 128.rack1.dc1.example.com:7050 | 1492596646623301
void PrettyPrintTable(const vector<string>& headers,
const vector<vector<string>>& columns,
ostream& out) {
CHECK_EQ(headers.size(), columns.size());
if (headers.empty()) return;
size_t num_columns = headers.size();
vector<size_t> widths;
for (int col = 0; col < num_columns; col++) {
size_t width = std::accumulate(columns[col].begin(), columns[col].end(), headers[col].size(),
[](size_t acc, const string& cell) {
return std::max(acc, cell.size());
});
widths.push_back(width);
}
// Print the header row.
for (int col = 0; col < num_columns; col++) {
int padding = widths[col] - headers[col].size();
out << setw(padding / 2) << "" << " " << headers[col];
if (col != num_columns - 1) out << setw((padding + 1) / 2) << "" << " |";
}
out << endl;
// Print the separator row.
out << setfill('-');
for (int col = 0; col < num_columns; col++) {
out << setw(widths[col] + 2) << "";
if (col != num_columns - 1) out << "+";
}
out << endl;
// Print the data rows.
out << setfill(' ');
int num_rows = columns.empty() ? 0 : columns[0].size();
for (int row = 0; row < num_rows; row++) {
for (int col = 0; col < num_columns; col++) {
const auto& value = columns[col][row];
out << " " << value;
if (col != num_columns - 1) {
size_t padding = widths[col] - value.size();
out << setw(padding) << "" << " |";
}
}
out << endl;
}
}
// Print a table using JSON formatting.
//
// The table is formatted as an array of objects. Each object corresponds
// to a row whose fields are the column values.
void JsonPrintTable(const vector<string>& headers,
const vector<vector<string>>& columns,
ostream& out) {
std::ostringstream stream;
JsonWriter writer(&stream, JsonWriter::COMPACT);
int num_columns = columns.size();
int num_rows = columns.empty() ? 0 : columns[0].size();
writer.StartArray();
for (int row = 0; row < num_rows; row++) {
writer.StartObject();
for (int col = 0; col < num_columns; col++) {
writer.String(headers[col]);
writer.String(columns[col][row]);
}
writer.EndObject();
}
writer.EndArray();
out << stream.str() << endl;
}
// Print the table using the provided separator. For example, with a comma
// separator:
//
// 335d132897de4bdb9b87443f2c487a42,126.rack1.dc1.example.com:7050,1492596790237811
// 7425c65d80f54f2da0a85494a5eb3e68,122.rack1.dc1.example.com:7050,1492596755322350
// dd23284d3a334f1a8306c19d89c1161f,130.rack1.dc1.example.com:7050,1492596704536543
// d8009e07d82b4e66a7ab50f85e60bc30,136.rack1.dc1.example.com:7050,1492596696557549
// c108a85a68504c2bb9f49e4ee683d981,128.rack1.dc1.example.com:7050,1492596646623301
void PrintTable(const vector<vector<string>>& columns, const string& separator, ostream& out) {
// TODO(dan): proper escaping of string values.
int num_columns = columns.size();
int num_rows = columns.empty() ? 0 : columns[0].size();
for (int row = 0; row < num_rows; row++) {
for (int col = 0; col < num_columns; col++) {
out << columns[col][row];
if (col != num_columns - 1) out << separator;
}
out << endl;
}
}
} // anonymous namespace
DataTable::DataTable(std::vector<string> col_names)
: column_names_(std::move(col_names)),
columns_(column_names_.size()) {
}
void DataTable::AddRow(std::vector<string> row) {
CHECK_EQ(row.size(), columns_.size());
int i = 0;
for (auto& v : row) {
columns_[i++].emplace_back(std::move(v));
}
}
void DataTable::AddColumn(string name, vector<string> column) {
if (!columns_.empty()) {
CHECK_EQ(column.size(), columns_[0].size());
}
column_names_.emplace_back(std::move(name));
columns_.emplace_back(std::move(column));
}
Status DataTable::PrintTo(ostream& out) const {
if (boost::iequals(FLAGS_format, "pretty")) {
PrettyPrintTable(column_names_, columns_, out);
} else if (boost::iequals(FLAGS_format, "space")) {
PrintTable(columns_, " ", out);
} else if (boost::iequals(FLAGS_format, "tsv")) {
PrintTable(columns_, " ", out);
} else if (boost::iequals(FLAGS_format, "csv")) {
PrintTable(columns_, ",", out);
} else if (boost::iequals(FLAGS_format, "json")) {
JsonPrintTable(column_names_, columns_, out);
} else {
return Status::InvalidArgument("unknown format (--format)", FLAGS_format);
}
return Status::OK();
}
LeaderMasterProxy::LeaderMasterProxy(client::sp::shared_ptr<KuduClient> client) :
client_(std::move(client)) {
}
Status LeaderMasterProxy::Init(const vector<string>& master_addrs, const MonoDelta& timeout) {
return KuduClientBuilder().master_server_addrs(master_addrs)
.default_rpc_timeout(timeout)
.default_admin_operation_timeout(timeout)
.Build(&client_);
}
Status LeaderMasterProxy::Init(const RunnerContext& context) {
const string& master_addrs = FindOrDie(context.required_args, kMasterAddressesArg);
return Init(strings::Split(master_addrs, ","), MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
}
template<typename Req, typename Resp>
Status LeaderMasterProxy::SyncRpc(const Req& req,
Resp* resp,
const char* func_name,
const boost::function<Status(master::MasterServiceProxy*,
const Req&, Resp*,
rpc::RpcController*)>& func) {
MonoTime deadline = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
return client_->data_->SyncLeaderMasterRpc(deadline, client_.get(), req, resp,
func_name, func, {});
}
// Explicit specializations for callers outside this compilation unit.
template
Status LeaderMasterProxy::SyncRpc(const ListTabletServersRequestPB& req,
ListTabletServersResponsePB* resp,
const char* func_name,
const boost::function<Status(MasterServiceProxy*,
const ListTabletServersRequestPB&,
ListTabletServersResponsePB*,
RpcController*)>& func);
template
Status LeaderMasterProxy::SyncRpc(const ListMastersRequestPB& req,
ListMastersResponsePB* resp,
const char* func_name,
const boost::function<Status(MasterServiceProxy*,
const ListMastersRequestPB&,
ListMastersResponsePB*,
RpcController*)>& func);
template
Status LeaderMasterProxy::SyncRpc(const ReplaceTabletRequestPB& req,
ReplaceTabletResponsePB* resp,
const char* func_name,
const boost::function<Status(MasterServiceProxy*,
const ReplaceTabletRequestPB&,
ReplaceTabletResponsePB*,
RpcController*)>& func);
const int ControlShellProtocol::kMaxMessageBytes = 1024 * 1024;
ControlShellProtocol::ControlShellProtocol(SerializationMode serialization_mode,
CloseMode close_mode,
int read_fd,
int write_fd)
: serialization_mode_(serialization_mode),
close_mode_(close_mode),
read_fd_(read_fd),
write_fd_(write_fd) {
}
ControlShellProtocol::~ControlShellProtocol() {
if (close_mode_ == CloseMode::CLOSE_ON_DESTROY) {
int ret;
RETRY_ON_EINTR(ret, close(read_fd_));
RETRY_ON_EINTR(ret, close(write_fd_));
}
}
template <class M>
Status ControlShellProtocol::ReceiveMessage(M* message) {
switch (serialization_mode_) {
case SerializationMode::JSON:
{
// Read and accumulate one byte at a time, looking for the newline.
//
// TODO(adar): it would be more efficient to read a chunk of data, look
// for a newline, and if found, store the remainder for the next message.
faststring buf;
faststring one_byte;
one_byte.resize(1);
while (true) {
RETURN_NOT_OK_PREPEND(DoRead(&one_byte), "unable to receive message byte");
if (one_byte[0] == '\n') {
break;
}
buf.push_back(one_byte[0]);
}
// Parse the JSON-encoded message.
const auto& google_status =
google::protobuf::util::JsonStringToMessage(buf.ToString(), message);
if (!google_status.ok()) {
return Status::InvalidArgument(
Substitute("unable to parse JSON: $0", buf.ToString()),
google_status.error_message().ToString());
}
break;
}
case SerializationMode::PB:
{
// Read four bytes of size (big-endian).
faststring size_buf;
size_buf.resize(sizeof(uint32_t));
RETURN_NOT_OK_PREPEND(DoRead(&size_buf), "unable to receive message size");
uint32_t body_size = NetworkByteOrder::Load32(size_buf.data());
if (body_size > kMaxMessageBytes) {
return Status::IOError(
Substitute("message size ($0) exceeds maximum message size ($1)",
body_size, kMaxMessageBytes));
}
// Read the variable size body.
faststring body_buf;
body_buf.resize(body_size);
RETURN_NOT_OK_PREPEND(DoRead(&body_buf), "unable to receive message body");
// Parse the body into a PB request.
RETURN_NOT_OK_PREPEND(pb_util::ParseFromArray(
message, body_buf.data(), body_buf.length()),
Substitute("unable to parse PB: $0", body_buf.ToString()));
break;
}
default: LOG(FATAL) << "Unknown mode";
}
VLOG(1) << "Received message: " << pb_util::SecureDebugString(*message);
return Status::OK();
}
template <class M>
Status ControlShellProtocol::SendMessage(const M& message) {
VLOG(1) << "Sending message: " << pb_util::SecureDebugString(message);
faststring buf;
switch (serialization_mode_) {
case SerializationMode::JSON:
{
string serialized;
const auto& google_status =
google::protobuf::util::MessageToJsonString(message, &serialized);
if (!google_status.ok()) {
return Status::InvalidArgument(Substitute(
"unable to serialize JSON: $0", pb_util::SecureDebugString(message)),
google_status.error_message().ToString());
}
buf.append(serialized);
buf.append("\n");
break;
}
case SerializationMode::PB:
{
size_t msg_size = message.ByteSizeLong();
buf.resize(sizeof(uint32_t) + msg_size);
NetworkByteOrder::Store32(buf.data(), msg_size);
if (!message.SerializeWithCachedSizesToArray(buf.data() + sizeof(uint32_t))) {
return Status::Corruption("failed to serialize PB to array");
}
break;
}
default:
break;
}
RETURN_NOT_OK_PREPEND(DoWrite(buf), "unable to send message");
return Status::OK();
}
Status ControlShellProtocol::DoRead(faststring* buf) {
uint8_t* pos = buf->data();
size_t rem = buf->length();
while (rem > 0) {
ssize_t r;
RETRY_ON_EINTR(r, read(read_fd_, pos, rem));
if (r == -1) {
return Status::IOError("Error reading from pipe", "", errno);
}
if (r == 0) {
return Status::EndOfFile("Other end of pipe was closed");
}
DCHECK_GE(rem, r);
rem -= r;
pos += r;
}
return Status::OK();
}
Status ControlShellProtocol::DoWrite(const faststring& buf) {
const uint8_t* pos = buf.data();
size_t rem = buf.length();
while (rem > 0) {
ssize_t r;
RETRY_ON_EINTR(r, write(write_fd_, pos, rem));
if (r == -1) {
if (errno == EPIPE) {
return Status::EndOfFile("Other end of pipe was closed");
}
return Status::IOError("Error writing to pipe", "", errno);
}
DCHECK_GE(rem, r);
rem -= r;
pos += r;
}
return Status::OK();
}
// Explicit specialization for callers outside this compilation unit.
template
Status ControlShellProtocol::ReceiveMessage(ControlShellRequestPB* message);
template
Status ControlShellProtocol::ReceiveMessage(ControlShellResponsePB* message);
template
Status ControlShellProtocol::SendMessage(const ControlShellRequestPB& message);
template
Status ControlShellProtocol::SendMessage(const ControlShellResponsePB& message);
} // namespace tools
} // namespace kudu