blob: 80c14ba296b07dcd2c6ba93759eb37fdafc1e80f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tools/tool_action_common.h"
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/numbers.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/server/server_base.proxy.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"
DEFINE_bool(force, false, "If true, allows the set_flag command to set a flag "
"which is not explicitly marked as runtime-settable. Such flag "
"changes may be simply ignored on the server, or may cause the "
"server to crash.");
DEFINE_bool(print_meta, true, "Include metadata in output");
DEFINE_string(print_entries, "decoded",
"How to print entries:\n"
" false|0|no = don't print\n"
" true|1|yes|decoded = print them decoded\n"
" pb = print the raw protobuf\n"
" id = print only their ids");
DEFINE_int64(timeout_ms, 1000 * 60, "RPC timeout in milliseconds");
DEFINE_int32(truncate_data, 100,
"Truncate the data fields to the given number of bytes "
"before printing. Set to 0 to disable");
namespace kudu {
namespace tools {
using consensus::ConsensusServiceProxy;
using consensus::ReplicateMsg;
using log::LogEntryPB;
using log::LogEntryReader;
using log::ReadableLogSegment;
using rpc::Messenger;
using rpc::MessengerBuilder;
using rpc::RequestIdPB;
using rpc::RpcController;
using server::GenericServiceProxy;
using server::GetStatusRequestPB;
using server::GetStatusResponsePB;
using server::ServerClockRequestPB;
using server::ServerClockResponsePB;
using server::ServerStatusPB;
using server::SetFlagRequestPB;
using server::SetFlagResponsePB;
using std::cout;
using std::endl;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using tserver::TabletServerAdminServiceProxy;
using tserver::TabletServerServiceProxy;
using tserver::WriteRequestPB;
const char* const kMasterAddressesArg = "master_addresses";
const char* const kMasterAddressesArgDesc = "Comma-separated list of Kudu "
"Master addresses where each address is of form 'hostname:port'";
const char* const kTabletIdArg = "tablet_id";
const char* const kTabletIdArgDesc = "Tablet Identifier";
namespace {
enum PrintEntryType {
DONT_PRINT,
PRINT_PB,
PRINT_DECODED,
PRINT_ID
};
PrintEntryType ParsePrintType() {
if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), true) == false) {
return DONT_PRINT;
} else if (ParseLeadingBoolValue(FLAGS_print_entries.c_str(), false) == true ||
FLAGS_print_entries == "decoded") {
return PRINT_DECODED;
} else if (FLAGS_print_entries == "pb") {
return PRINT_PB;
} else if (FLAGS_print_entries == "id") {
return PRINT_ID;
} else {
LOG(FATAL) << "Unknown value for --print_entries: " << FLAGS_print_entries;
}
}
void PrintIdOnly(const LogEntryPB& entry) {
switch (entry.type()) {
case log::REPLICATE:
{
cout << entry.replicate().id().term() << "." << entry.replicate().id().index()
<< "@" << entry.replicate().timestamp() << "\t";
cout << "REPLICATE "
<< OperationType_Name(entry.replicate().op_type());
break;
}
case log::COMMIT:
{
cout << "COMMIT " << entry.commit().commited_op_id().term()
<< "." << entry.commit().commited_op_id().index();
break;
}
default:
cout << "UNKNOWN: " << entry.ShortDebugString();
}
cout << endl;
}
Status PrintDecodedWriteRequestPB(const string& indent,
const Schema& tablet_schema,
const WriteRequestPB& write,
const RequestIdPB* request_id) {
Schema request_schema;
RETURN_NOT_OK(SchemaFromPB(write.schema(), &request_schema));
Arena arena(32 * 1024, 1024 * 1024);
RowOperationsPBDecoder dec(&write.row_operations(), &request_schema, &tablet_schema, &arena);
vector<DecodedRowOperation> ops;
RETURN_NOT_OK(dec.DecodeOperations(&ops));
cout << indent << "Tablet: " << write.tablet_id() << endl;
cout << indent << "RequestId: "
<< (request_id ? request_id->ShortDebugString() : "None") << endl;
cout << indent << "Consistency: "
<< ExternalConsistencyMode_Name(write.external_consistency_mode()) << endl;
if (write.has_propagated_timestamp()) {
cout << indent << "Propagated TS: " << write.propagated_timestamp() << endl;
}
int i = 0;
for (const DecodedRowOperation& op : ops) {
// TODO (KUDU-515): Handle the case when a tablet's schema changes
// mid-segment.
cout << indent << "op " << (i++) << ": " << op.ToString(tablet_schema) << endl;
}
return Status::OK();
}
Status PrintDecoded(const LogEntryPB& entry, const Schema& tablet_schema) {
PrintIdOnly(entry);
const string indent = "\t";
if (entry.has_replicate()) {
// We can actually decode REPLICATE messages.
const ReplicateMsg& replicate = entry.replicate();
if (replicate.op_type() == consensus::WRITE_OP) {
RETURN_NOT_OK(PrintDecodedWriteRequestPB(
indent,
tablet_schema,
replicate.write_request(),
replicate.has_request_id() ? &replicate.request_id() : nullptr));
} else {
cout << indent << replicate.ShortDebugString() << endl;
}
} else if (entry.has_commit()) {
// For COMMIT we'll just dump the PB
cout << indent << entry.commit().ShortDebugString() << endl;
}
return Status::OK();
}
} // anonymous namespace
template<class ProxyClass>
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<ProxyClass>* proxy) {
HostPort hp;
RETURN_NOT_OK(hp.ParseString(address, default_port));
shared_ptr<Messenger> messenger;
RETURN_NOT_OK(MessengerBuilder("tool").Build(&messenger));
vector<Sockaddr> resolved;
RETURN_NOT_OK(hp.ResolveAddresses(&resolved));
proxy->reset(new ProxyClass(messenger, resolved[0]));
return Status::OK();
}
// Explicit specialization for callers outside this compilation unit.
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<ConsensusServiceProxy>* proxy);
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<TabletServerServiceProxy>* proxy);
template
Status BuildProxy(const string& address,
uint16_t default_port,
unique_ptr<TabletServerAdminServiceProxy>* proxy);
Status GetServerStatus(const string& address, uint16_t default_port,
ServerStatusPB* status) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
GetStatusRequestPB req;
GetStatusResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK(proxy->GetStatus(req, &resp, &rpc));
if (!resp.has_status()) {
return Status::Incomplete("Server response did not contain status",
proxy->ToString());
}
*status = resp.status();
return Status::OK();
}
Status PrintSegment(const scoped_refptr<ReadableLogSegment>& segment) {
PrintEntryType print_type = ParsePrintType();
if (FLAGS_print_meta) {
cout << "Header:\n" << segment->header().DebugString();
}
if (print_type != DONT_PRINT) {
Schema tablet_schema;
RETURN_NOT_OK(SchemaFromPB(segment->header().schema(), &tablet_schema));
LogEntryReader reader(segment.get());
LogEntryPB entry;
while (true) {
Status s = reader.ReadNextEntry(&entry);
if (s.IsEndOfFile()) break;
RETURN_NOT_OK(s);
if (print_type == PRINT_PB) {
if (FLAGS_truncate_data > 0) {
pb_util::TruncateFields(&entry, FLAGS_truncate_data);
}
cout << "Entry:\n" << entry.DebugString();
} else if (print_type == PRINT_DECODED) {
RETURN_NOT_OK(PrintDecoded(entry, tablet_schema));
} else if (print_type == PRINT_ID) {
PrintIdOnly(entry);
}
}
}
if (FLAGS_print_meta && segment->HasFooter()) {
cout << "Footer:\n" << segment->footer().DebugString();
}
return Status::OK();
}
Status SetServerFlag(const string& address, uint16_t default_port,
const string& flag, const string& value) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
SetFlagRequestPB req;
SetFlagResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
req.set_flag(flag);
req.set_value(value);
req.set_force(FLAGS_force);
RETURN_NOT_OK(proxy->SetFlag(req, &resp, &rpc));
switch (resp.result()) {
case server::SetFlagResponsePB::SUCCESS:
return Status::OK();
case server::SetFlagResponsePB::NOT_SAFE:
return Status::RemoteError(resp.msg() +
" (use --force flag to allow anyway)");
default:
return Status::RemoteError(resp.ShortDebugString());
}
}
Status PrintServerStatus(const string& address, uint16_t default_port) {
ServerStatusPB status;
RETURN_NOT_OK(GetServerStatus(address, default_port, &status));
cout << status.DebugString() << endl;
return Status::OK();
}
Status PrintServerTimestamp(const string& address, uint16_t default_port) {
unique_ptr<GenericServiceProxy> proxy;
RETURN_NOT_OK(BuildProxy(address, default_port, &proxy));
ServerClockRequestPB req;
ServerClockResponsePB resp;
RpcController rpc;
rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
RETURN_NOT_OK(proxy->ServerClock(req, &resp, &rpc));
if (!resp.has_timestamp()) {
return Status::Incomplete("Server response did not contain timestamp",
proxy->ToString());
}
cout << resp.timestamp() << endl;
return Status::OK();
}
} // namespace tools
} // namespace kudu