// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstdint>
#include <iosfwd>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "kudu/common/schema.h"
#include "kudu/gutil/macros.h"
#include "kudu/tools/ksck_results.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/locks.h"
#include "kudu/util/make_shared.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/threadpool.h"
namespace kudu {
namespace rpc {
class PeriodicTimer;
class Messenger;
} // namespace rpc
namespace tools {
class KsckCluster;
class KsckTablet;
class KsckTabletServer;
// Options for checksum scans.
struct KsckChecksumOptions {
// A checksum with this snapshot timestamp will choose a timestamp for each
// tablet, from one of the tablet servers hosting a replica, at the
// time the checksums are started.
static constexpr uint64_t kCurrentTimestamp = 0;
KsckChecksumOptions(std::vector<std::string> table_filters,
std::vector<std::string> tablet_id_filters);
KsckChecksumOptions(MonoDelta timeout,
MonoDelta idle_timeout,
int scan_concurrency,
bool use_snapshot,
uint64_t snapshot_timestamp);
KsckChecksumOptions(MonoDelta timeout,
MonoDelta idle_timeout,
int scan_concurrency,
bool use_snapshot,
uint64_t snapshot_timestamp,
std::vector<std::string> table_filters,
std::vector<std::string> tablet_id_filters);
// The maximum total time to wait for results to come back from all replicas.
MonoDelta timeout;
// The maximum amount of time to wait for progress to be made. Progress
// means at least one additional row is checksummed.
MonoDelta idle_timeout;
// The maximum number of concurrent checksum scans to run per tablet server.
int scan_concurrency;
// Whether to use a snapshot checksum scanner.
bool use_snapshot;
// The snapshot timestamp to use for snapshot checksum scans.
uint64_t snapshot_timestamp;
// Filters for the table names and tablet ids whose contents should be
// checksummed.
std::vector<std::string> table_filters;
std::vector<std::string> tablet_id_filters;
typedef std::pair<Status, uint64_t> ReplicaChecksumResult;
typedef std::unordered_map<std::string, ReplicaChecksumResult> TabletChecksumResult;
typedef std::unordered_map<std::string, TabletChecksumResult> TabletChecksumResultsMap;
// A convenience struct containing info needed to checksum a particular tablet.
struct TabletChecksumInfo {
TabletChecksumInfo(std::shared_ptr<KsckTablet> tablet, Schema schema)
: tablet(std::move(tablet)),
schema(std::move(schema)) {}
// The tablet to be checksummed.
std::shared_ptr<KsckTablet> tablet;
// The schema of the tablet's table.
Schema schema;
typedef std::unordered_map<std::string, TabletChecksumInfo> TabletInfoMap;
// Map (tablet server UUID -> number of open slots available for checksum scans).
typedef std::unordered_map<std::string, int> TabletServerChecksumScanSlotsMap;
typedef std::vector<std::shared_ptr<KsckTabletServer>> TabletServerList;
// Class to coordinate a checksum process. Checksums are started on all replicas
// of a tablet at once while respecting per-tablet-server checksum scan
// concurrency limits.
class KsckChecksumManager : public std::enable_shared_from_this<KsckChecksumManager>,
public enable_make_shared<KsckChecksumManager> {
// Return in 'manager' a new KsckChecksumManager created from the given
// parameters. All replicas of tablet in 'tablet_infos' must be on tablet
// servers in 'tservers'. Because its ownership is shared with callbacks that
// are part of the checksum process, a KsckChecksumManager should always be
// wrapped in a shared_ptr.
// If 'messenger' is non-null, it will be used by the instance; otherwise, a
// new messenger will be constructed.
static Status New(KsckChecksumOptions opts,
TabletInfoMap tablet_infos,
TabletServerList tservers,
std::shared_ptr<rpc::Messenger> messenger,
std::shared_ptr<KsckChecksumManager>* manager);
// Reports an increase in the number of rows and bytes from disk processed
// by checksums to this KsckChecksumManager. This information is used in
// progress messages.
void ReportProgress(int64_t delta_rows, int64_t delta_bytes);
// Reports the result of checksumming all the replicas of tablet to this
// KsckChecksumManager.
void ReportResult(const std::string& tablet_id,
const std::string& replica_uuid,
const Status& status,
uint64_t checksum);
// The possible final outcomes of a checksum process.
enum class Outcome {
// All replicas finished their checksums (either successfully or not).
// The checksum process timed out.
// The checksum process went too long without making progress.
// Blocks until the number of replica results and errors reported equals
// the number of replicas that need to be processed, until the this instnce's
// timeout expires, or until the checksum process makes no progress for longer
// than this instance's idle timeout, whichever comes first. Progress messages
// are printed to 'out' if it is non-null.
Outcome WaitFor(std::ostream* out);
// Run the checksum process asynchronously.
// The caller should wait for results with WaitFor().
Status RunChecksumsAsync();
// Get a snapshot of results reported so far.
TabletChecksumResultsMap checksums() const {
std::lock_guard<simple_spinlock> guard(lock_);
return checksums_;
KsckChecksumManager(int num_replicas,
KsckChecksumOptions opts,
TabletInfoMap tablet_infos,
TabletServerList tservers,
std::shared_ptr<rpc::Messenger> messenger);
// Perform post-construction initialization that may fail.
Status Init();
// Shutdown this manager.
void Shutdown();
// Start as many tablet checksums as possible, given the per-tablet-server
// concurrency limits on checksum scans.
// Since this uses a brute force method, it is fairly expensive, and therefore
// we run it on a threadpool instead of in the callback, which is run from a
// reactor thread.
void StartTabletChecksums();
// Are there enough checksum scan slots available on the tablet servers
// hosting replicas of 'tablet' to start a checksum scan on all of them?
// If so, return true and reserve the slots. Else, return false.
bool ReserveSlotsToChecksumUnlocked(const std::shared_ptr<KsckTablet>& tablet);
// Begin the checksum on the tablet named in 'tablet_info'.
void BeginTabletChecksum(const TabletChecksumInfo& tablet_info);
// Initialize 'ts_open_slots_map_'.
void InitializeTsSlotsMap();
// Release a checksum scan slot on each tserver in 'tserver_uuids'.
void ReleaseTsSlotsUnlocked(const std::vector<std::string>& ts_uuids);
// Are there any open slots at all?
bool HasOpenTsSlotsUnlocked() const;
// Returns a summary of checksum scan slot usage across tablet servers.
// This is useful as debug info to check how saturated the tablet servers are
// with checksum scans.
std::string OpenTsSlotSummaryString() const;
// The options for the checksum process.
const KsckChecksumOptions opts_;
// A map of information about tablets to be checksummed. As tablet checksums
// are started, entries are removed from this map.
TabletInfoMap tablet_infos_;
// Tracks the open slots for each tablet server that hosts a replica of the
// tablets in 'tablet_infos_'.
TabletServerChecksumScanSlotsMap ts_slots_open_map_;
// checksums_ is an unordered_map of { tablet_id : { replica_uuid : checksum } }.
TabletChecksumResultsMap checksums_;
// Protects 'tablet_infos_', 'ts_slots_map_', and 'checksums_'.
mutable simple_spinlock lock_;
// The list of tablet servers that checksum scans will be run on. Every
// replica of tablet to be checksummed must be located on one of these
// tablet servers.
const TabletServerList tservers_;
const int expected_replica_count_;
CountDownLatch responses_;
// Used for the 'timestamp_update_timer_' periodic timer.
std::shared_ptr<rpc::Messenger> messenger_;
// A timer used to periodically refresh the timestamps of the tablet servers
// in 'tablet_servers_', so that snapshot timestamps don't fall behind the
// ancient history mark.
std::shared_ptr<rpc::PeriodicTimer> timestamp_update_timer_;
// A threadpool for running tasks that find additional tablets that can
// be checksummed based on available slots on tablet servers.
std::unique_ptr<ThreadPool> find_tablets_to_checksum_pool_;
std::atomic<int64_t> rows_summed_;
std::atomic<int64_t> disk_bytes_summed_;
// A class for performing checksums on a Kudu cluster.
class KsckChecksummer {
// 'cluster' must remain valid as long as this instance is alive.
explicit KsckChecksummer(KsckCluster* cluster);
// Checksum the data in the Kudu cluster according to the options provided in
// 'opts'. Results will be populated in the 'checksum_results'. If non-null,
// progress updates will be written to 'out_for_progress_updates'.
// NOTE: Even if this method returns a bad Status, 'checksum_results' will be
// populated with whatever checksum results were received.
Status ChecksumData(const KsckChecksumOptions& opts,
KsckChecksumResults* checksum_results,
std::ostream* out_for_progress_updates);
// Builds a map of tablets to-be-checksummed, given the options in 'opts' and
// the cluster 'cluster'. The resulting tablets are populated in 'tablet_infos'
// and the total number of replicas to be checksummed is set in 'num_replica'.
Status BuildTabletInfoMap(const KsckChecksumOptions& opts,
TabletInfoMap* tablet_infos,
int* num_replicas) const;
// Collates the results of checksums that are reported in 'checksums' into
// 'table_checksum_map', with the total number of results returned as
// 'num_results'.
// NOTE: Even if this function returns a bad Status, 'table_checksum_map'
// and 'num_results' will still be populated using whatever results are
// available.
Status CollateChecksumResults(
const TabletChecksumResultsMap& checksums,
KsckTableChecksumMap* table_checksum_map,
int* num_results) const;
KsckCluster* cluster_;
} // namespace tools
} // namespace kudu