blob: a17c5ead5761500b2fb8a3280aefdcdeef4ddcf9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tools/ksck_remote.h"
#include "kudu/client/client.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/strings/util.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/net/sockaddr.h"
DECLARE_int64(timeout_ms); // defined in tool_action_common
DEFINE_bool(checksum_cache_blocks, false, "Should the checksum scanners cache the read blocks");
namespace kudu {
namespace tools {
static const std::string kMessengerName = "ksck";
using client::KuduClient;
using client::KuduClientBuilder;
using client::KuduReplica;
using client::KuduScanToken;
using client::KuduScanTokenBuilder;
using client::KuduTable;
using client::KuduTabletServer;
using rpc::Messenger;
using rpc::MessengerBuilder;
using rpc::RpcController;
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
MonoDelta GetDefaultTimeout() {
return MonoDelta::FromMilliseconds(FLAGS_timeout_ms);
}
Status RemoteKsckTabletServer::Init() {
vector<Sockaddr> addresses;
RETURN_NOT_OK(ParseAddressList(
host_port_.ToString(),
tserver::TabletServer::kDefaultPort, &addresses));
generic_proxy_.reset(new server::GenericServiceProxy(messenger_, addresses[0]));
ts_proxy_.reset(new tserver::TabletServerServiceProxy(messenger_, addresses[0]));
return Status::OK();
}
Status RemoteKsckTabletServer::FetchInfo() {
state_ = kFetchFailed;
{
tserver::PingRequestPB req;
tserver::PingResponsePB resp;
RpcController rpc;
rpc.set_timeout(GetDefaultTimeout());
RETURN_NOT_OK_PREPEND(ts_proxy_->Ping(req, &resp, &rpc),
"could not send Ping RPC to server");
}
{
tserver::ListTabletsRequestPB req;
tserver::ListTabletsResponsePB resp;
RpcController rpc;
rpc.set_timeout(GetDefaultTimeout());
req.set_need_schema_info(false);
RETURN_NOT_OK_PREPEND(ts_proxy_->ListTablets(req, &resp, &rpc),
"could not list tablets");
tablet_status_map_.clear();
for (auto& status : *resp.mutable_status_and_schema()) {
tablet_status_map_[status.tablet_status().tablet_id()].Swap(status.mutable_tablet_status());
}
}
{
server::ServerClockRequestPB req;
server::ServerClockResponsePB resp;
RpcController rpc;
rpc.set_timeout(GetDefaultTimeout());
RETURN_NOT_OK_PREPEND(generic_proxy_->ServerClock(req, &resp, &rpc),
"could not fetch timestamp");
CHECK(resp.has_timestamp());
timestamp_ = resp.timestamp();
}
state_ = kFetched;
return Status::OK();
}
class ChecksumStepper;
// Simple class to act as a callback in order to collate results from parallel
// checksum scans.
class ChecksumCallbackHandler {
public:
explicit ChecksumCallbackHandler(ChecksumStepper* const stepper)
: stepper(DCHECK_NOTNULL(stepper)) {
}
// Invoked by an RPC completion callback. Simply calls back into the stepper.
// Then the call to the stepper returns, deletes 'this'.
void Run();
private:
ChecksumStepper* const stepper;
};
// Simple class to have a "conversation" over multiple requests to a server
// to carry out a multi-part checksum scan.
// If any errors or timeouts are encountered, the checksum operation fails.
// After the ChecksumStepper reports its results to the reporter, it deletes itself.
class ChecksumStepper {
public:
ChecksumStepper(string tablet_id, const Schema& schema, string server_uuid,
ChecksumOptions options, ChecksumProgressCallbacks* callbacks,
shared_ptr<tserver::TabletServerServiceProxy> proxy)
: schema_(schema),
tablet_id_(std::move(tablet_id)),
server_uuid_(std::move(server_uuid)),
options_(std::move(options)),
callbacks_(callbacks),
proxy_(std::move(proxy)),
call_seq_id_(0),
checksum_(0) {
DCHECK(proxy_);
}
void Start() {
Status s = SchemaToColumnPBs(schema_, &cols_,
SCHEMA_PB_WITHOUT_IDS | SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES);
if (!s.ok()) {
callbacks_->Finished(s, 0);
} else {
SendRequest(kNewRequest);
}
}
void HandleResponse() {
gscoped_ptr<ChecksumStepper> deleter(this);
Status s = rpc_.status();
if (s.ok() && resp_.has_error()) {
s = StatusFromPB(resp_.error().status());
}
if (!s.ok()) {
callbacks_->Finished(s, 0);
return; // Deletes 'this'.
}
if (resp_.has_resource_metrics() || resp_.has_rows_checksummed()) {
int64_t bytes = resp_.resource_metrics().cfile_cache_miss_bytes() +
resp_.resource_metrics().cfile_cache_hit_bytes();
callbacks_->Progress(resp_.rows_checksummed(), bytes);
}
DCHECK(resp_.has_checksum());
checksum_ = resp_.checksum();
// Report back with results.
if (!resp_.has_more_results()) {
callbacks_->Finished(s, checksum_);
return; // Deletes 'this'.
}
// We're not done scanning yet. Fetch the next chunk.
if (resp_.has_scanner_id()) {
scanner_id_ = resp_.scanner_id();
}
SendRequest(kContinueRequest);
ignore_result(deleter.release()); // We have more work to do.
}
private:
enum RequestType {
kNewRequest,
kContinueRequest
};
void SendRequest(RequestType type) {
switch (type) {
case kNewRequest: {
req_.set_call_seq_id(call_seq_id_);
req_.mutable_new_request()->mutable_projected_columns()->CopyFrom(cols_);
req_.mutable_new_request()->set_tablet_id(tablet_id_);
req_.mutable_new_request()->set_cache_blocks(FLAGS_checksum_cache_blocks);
if (options_.use_snapshot) {
req_.mutable_new_request()->set_read_mode(READ_AT_SNAPSHOT);
req_.mutable_new_request()->set_snap_timestamp(options_.snapshot_timestamp);
}
rpc_.set_timeout(GetDefaultTimeout());
break;
}
case kContinueRequest: {
req_.Clear();
resp_.Clear();
rpc_.Reset();
req_.set_call_seq_id(++call_seq_id_);
DCHECK(!scanner_id_.empty());
req_.mutable_continue_request()->set_scanner_id(scanner_id_);
req_.mutable_continue_request()->set_previous_checksum(checksum_);
break;
}
default:
LOG(FATAL) << "Unknown type";
break;
}
gscoped_ptr<ChecksumCallbackHandler> handler(new ChecksumCallbackHandler(this));
rpc::ResponseCallback cb = boost::bind(&ChecksumCallbackHandler::Run, handler.get());
proxy_->ChecksumAsync(req_, &resp_, &rpc_, cb);
ignore_result(handler.release());
}
const Schema schema_;
google::protobuf::RepeatedPtrField<ColumnSchemaPB> cols_;
const string tablet_id_;
const string server_uuid_;
const ChecksumOptions options_;
ChecksumProgressCallbacks* const callbacks_;
const shared_ptr<tserver::TabletServerServiceProxy> proxy_;
uint32_t call_seq_id_;
string scanner_id_;
uint64_t checksum_;
tserver::ChecksumRequestPB req_;
tserver::ChecksumResponsePB resp_;
RpcController rpc_;
};
void ChecksumCallbackHandler::Run() {
stepper->HandleResponse();
delete this;
}
void RemoteKsckTabletServer::RunTabletChecksumScanAsync(
const string& tablet_id,
const Schema& schema,
const ChecksumOptions& options,
ChecksumProgressCallbacks* callbacks) {
gscoped_ptr<ChecksumStepper> stepper(
new ChecksumStepper(tablet_id, schema, uuid(), options, callbacks, ts_proxy_));
stepper->Start();
ignore_result(stepper.release()); // Deletes self on callback.
}
Status RemoteKsckMaster::Connect() {
client::sp::shared_ptr<KuduClient> client;
KuduClientBuilder builder;
builder.default_rpc_timeout(GetDefaultTimeout());
builder.master_server_addrs(master_addresses_);
return builder.Build(&client_);
}
Status RemoteKsckMaster::Build(const vector<string>& master_addresses,
shared_ptr<KsckMaster>* master) {
shared_ptr<Messenger> messenger;
MessengerBuilder builder(kMessengerName);
RETURN_NOT_OK(builder.Build(&messenger));
master->reset(new RemoteKsckMaster(master_addresses, messenger));
return Status::OK();
}
Status RemoteKsckMaster::RetrieveTabletServers(TSMap* tablet_servers) {
vector<KuduTabletServer*> servers;
ElementDeleter deleter(&servers);
RETURN_NOT_OK(client_->ListTabletServers(&servers));
tablet_servers->clear();
for (const auto* s : servers) {
shared_ptr<RemoteKsckTabletServer> ts(
new RemoteKsckTabletServer(s->uuid(),
HostPort(s->hostname(), s->port()),
messenger_));
RETURN_NOT_OK(ts->Init());
InsertOrDie(tablet_servers, ts->uuid(), ts);
}
return Status::OK();
}
Status RemoteKsckMaster::RetrieveTablesList(vector<shared_ptr<KsckTable>>* tables) {
vector<string> table_names;
RETURN_NOT_OK(client_->ListTables(&table_names));
vector<shared_ptr<KsckTable>> tables_temp;
for (const auto& n : table_names) {
client::sp::shared_ptr<KuduTable> t;
RETURN_NOT_OK(client_->OpenTable(n, &t));
shared_ptr<KsckTable> table(new KsckTable(n,
*t->schema().schema_,
t->num_replicas()));
tables_temp.push_back(table);
}
tables->assign(tables_temp.begin(), tables_temp.end());
return Status::OK();
}
Status RemoteKsckMaster::RetrieveTabletsList(const shared_ptr<KsckTable>& table) {
vector<shared_ptr<KsckTablet>> tablets;
client::sp::shared_ptr<KuduTable> client_table;
RETURN_NOT_OK(client_->OpenTable(table->name(), &client_table));
vector<KuduScanToken*> tokens;
ElementDeleter deleter(&tokens);
KuduScanTokenBuilder builder(client_table.get());
RETURN_NOT_OK(builder.Build(&tokens));
for (const auto* t : tokens) {
shared_ptr<KsckTablet> tablet(
new KsckTablet(table.get(), t->tablet().id()));
vector<shared_ptr<KsckTabletReplica>> replicas;
for (const auto* r : t->tablet().replicas()) {
replicas.push_back(shared_ptr<KsckTabletReplica>(
new KsckTabletReplica(r->ts().uuid(), r->is_leader())));
}
tablet->set_replicas(replicas);
tablets.push_back(tablet);
}
table->set_tablets(tablets);
return Status::OK();
}
} // namespace tools
} // namespace kudu