// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once

#include <condition_variable> // IWYU pragma: keep
#include <cstdint>
#include <ctime>
#include <iosfwd>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <random>
#include <set>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/rebalance/rebalance_algo.h"
#include "kudu/rebalance/rebalancer.h"
#include "kudu/tools/ksck.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"     // IWYU pragma: keep
#include "kudu/util/status.h"

namespace kudu {

namespace client {
class KuduClient;
}

namespace tools {

struct KsckResults;

// A class implementing logic for Kudu cluster rebalancing.
// This class inherits from rebalance::Rebalancer but also
// implements additional functions to print cluster balance
// information.
class RebalancerTool : public rebalance::Rebalancer {
 public:

  // Create Rebalancer object with the specified configuration.
  explicit RebalancerTool(const Config& config);

  // Print the stats on the cluster balance information into the 'out' stream.
  Status PrintStats(std::ostream& out);

  // Run the rebalancing: start the process and return once the balancing
  // criteria are satisfied or if an error occurs. The number of attempted
  // moves is output into the 'moves_count' parameter (if the parameter is
  // not null). The 'result_status' output parameter cannot be null.
  Status Run(RunStatus* result_status, size_t* moves_count = nullptr);

 private:
  // Common base for a few Runner implementations.
  class BaseRunner : public Runner {
   public:
    BaseRunner(RebalancerTool* rebalancer,
               std::unordered_set<std::string> ignored_tservers,
               size_t max_moves_per_server,
               std::optional<MonoTime> deadline);

    Status Init(std::vector<std::string> master_addresses) override;

    Status GetNextMoves(bool* has_moves) override;

    uint32_t moves_count() const override {
      return moves_count_;
    }

   protected:
    // Get next batch of replica moves from the rebalancing algorithm.
    // Essentially, it runs ksck against the cluster and feeds the data into the
    // rebalancing algorithm along with the information on currently pending
    // replica movement operations. The information returned by the high-level
    // rebalancing algorithm is translated into particular replica movement
    // instructions, which are used to populate the 'replica_moves' parameter
    // (the container is cleared first).
    virtual Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* moves) = 0;

    // Update the helper containers once a scheduled operation is complete
    // (i.e. succeeded or failed).
    void UpdateOnMoveCompleted(const std::string& ts_uuid);

    // Check if all the tablets servers (excluding those specified in 'ignored_tservers')
    // are healthy and available for replica placement.
    Status CheckTabletServers(const rebalance::ClusterRawInfo& raw_info);

    // A pointer to the Rebalancer object.
    RebalancerTool* rebalancer_;

    // A set of ignored tablet server UUIDs.
    const std::unordered_set<std::string> ignored_tservers_;

    // Maximum allowed number of move operations per server. For a move
    // operation, a source replica adds +1 at the source server and the target
    // replica adds +1 at the destination server.
    const size_t max_moves_per_server_;

    // Deadline for the activity performed by the Runner class in
    // ScheduleNextMoves() and UpdateMovesInProgressStatus() methods.
    const std::optional<MonoTime> deadline_;

    // Client object to make queries to Kudu masters for various auxiliary info
    // while scheduling move operations and monitoring their status.
    client::sp::shared_ptr<client::KuduClient> client_;

    // Information on scheduled replica movement operations; keys are
    // tablet UUIDs, values are ReplicaMove structures.
    Rebalancer::MovesInProgress scheduled_moves_;

    // Number of successfully completed replica moves operations.
    uint32_t moves_count_;

    // Kudu cluster RPC end-points.
    std::vector<std::string> master_addresses_;

    // Mapping 'tserver UUID' --> 'scheduled move operations count'.
    std::unordered_map<std::string, int32_t> op_count_per_ts_;

    // Mapping 'scheduled move operations count' --> 'tserver UUID'. That's
    // just reversed 'op_count_per_ts_'.
    std::multimap<int32_t, std::string> ts_per_op_count_;
  }; // class BaseRunner

  // Runner that leverages RebalancingAlgo interface for rebalancing.
  class AlgoBasedRunner : public BaseRunner {
   public:
    // The 'ignored_tservers' specifies tablet servers that could be
    // ignored by rebalancer.
    // The 'max_moves_per_server' specifies the maximum number of operations
    // per tablet server (both the source and the destination are counted in).
    // The 'deadline' specifies the deadline for the run, 'std::nullopt'
    // if no timeout is set. If 'location' is std::nullopt, rebalance across
    // locations.
    AlgoBasedRunner(RebalancerTool* rebalancer,
                    std::unordered_set<std::string> ignored_tservers,
                    size_t max_moves_per_server,
                    std::optional<MonoTime> deadline);

    Status Init(std::vector<std::string> master_addresses) override;

    void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) override;

    bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;

    bool UpdateMovesInProgressStatus(bool* has_errors,
                                     bool* timed_out,
                                     bool* has_pending_moves) override;

    // Get the cluster location the runner is slated to run/running at.
    // 'std::nullopt' means all the cluster.
    virtual const std::optional<std::string>& location() const = 0;

    // Rebalancing algorithm that running uses to find replica moves.
    virtual rebalance::RebalancingAlgo* algorithm() = 0;

   protected:
    Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* replica_moves) override;

    // Using the helper containers src_op_indices_ and dst_op_indices_,
    // find the index of the most optimal replica movement operation
    // and output the index into the 'op_idx' parameter.
    bool FindNextMove(size_t* op_idx);

    // Update the helper containers once a move operation has been scheduled.
    void UpdateOnMoveScheduled(size_t idx,
                               const std::string& tablet_uuid,
                               const std::string& src_ts_uuid,
                               const std::string& dst_ts_uuid,
                               bool is_success);

    // Auxiliary method used by UpdateOnMoveScheduled() implementation.
    void UpdateOnMoveScheduledImpl(
        size_t idx,
        const std::string& ts_uuid,
        bool is_success,
        std::unordered_map<std::string, std::set<size_t>>* op_indices);

    // The moves to schedule.
    std::vector<Rebalancer::ReplicaMove> replica_moves_;

    // Mapping 'tserver UUID' --> 'indices of move operations having the
    // tserver UUID (i.e. the key) as the source of the move operation'.
    std::unordered_map<std::string, std::set<size_t>> src_op_indices_;

    // Mapping 'tserver UUID' --> 'indices of move operations having the
    // tserver UUID (i.e. the key) as the destination of the move operation'.
    std::unordered_map<std::string, std::set<size_t>> dst_op_indices_;

    // Random device and generator for selecting among multiple choices, when
    // appropriate.
    std::random_device random_device_;
    std::mt19937 random_generator_;
  }; // class AlgoBasedRunner

  class IntraLocationRunner : public AlgoBasedRunner {
   public:
    // The 'ignored_tservers' specifies tablet servers that could be
    // ignored by rebalancer.
    // The 'max_moves_per_server' specifies the maximum number of operations
    // per tablet server (both the source and the destination are counted in).
    // The 'deadline' specifies the deadline for the run, 'std::nullopt'
    // if no timeout is set. In case of non-location aware cluster or if there
    // is just one location defined in the whole cluster, the whole cluster will
    // be rebalanced.
    IntraLocationRunner(RebalancerTool* rebalancer,
                        std::unordered_set<std::string> ignored_tservers,
                        size_t max_moves_per_server,
                        std::optional<MonoTime> deadline,
                        std::string location);

    rebalance::RebalancingAlgo* algorithm() override {
      return &algorithm_;
    }

    const std::optional<std::string>& location() const override {
      return location_;
    }

   private:
    const std::optional<std::string> location_;

    // An instance of the balancing algorithm.
    rebalance::TwoDimensionalGreedyAlgo algorithm_;
  };

  class CrossLocationRunner : public AlgoBasedRunner {
   public:
    // The 'ignored_tservers' specifies tablet servers that could be
    // ignored by rebalancer.
    // The 'max_moves_per_server' specifies the maximum number of operations
    // per tablet server (both the source and the destination are counted in).
    // The 'load_imbalance_threshold' specified the threshold for the
    // balancing algorithm used for finding the most optimal replica movements.
    // The 'deadline' specifies the deadline for the run, 'std::nullopt'
    // if no timeout is set.
    CrossLocationRunner(RebalancerTool* rebalancer,
                        std::unordered_set<std::string> ignored_tservers,
                        size_t max_moves_per_server,
                        double load_imbalance_threshold,
                        std::optional<MonoTime> deadline);

    rebalance::RebalancingAlgo* algorithm() override {
      return &algorithm_;
    }

    const std::optional<std::string>& location() const override {
      return location_;
    }

   private:
    const std::optional<std::string> location_ = std::nullopt;

    // An instance of the balancing algorithm.
    rebalance::LocationBalancingAlgo algorithm_;
  };

  // Runner that leverages 'SetReplace' method to move replicas.
  class ReplaceBasedRunner : public BaseRunner {
   public:
    // The 'ignored_tservers' specifies tablet servers that could be
    // ignored by rebalancer.
    // The 'max_moves_per_server' specifies the maximum number of operations
    // per tablet server (both the source and the destination are counted in).
    // The 'load_imbalance_threshold' specified the threshold for the
    // balancing algorithm used for finding the most optimal replica movements.
    // The 'deadline' specifies the deadline for the run, 'std::nullopt'
    // if no timeout is set.
    ReplaceBasedRunner(RebalancerTool* rebalancer,
                       std::unordered_set<std::string> ignored_tservers,
                       size_t max_moves_per_server,
                       std::optional<MonoTime> deadline);

    Status Init(std::vector<std::string> master_addresses) override;

    void LoadMoves(std::vector<Rebalancer::ReplicaMove> replica_moves) override;

    bool ScheduleNextMove(bool* has_errors, bool* timed_out) override;

    bool UpdateMovesInProgressStatus(bool* has_errors,
                                     bool* timed_out,
                                     bool* has_pending_moves) override;

   protected:
    // Key is tserver UUID which corresponds to value.ts_uuid_from.
    typedef std::unordered_multimap<std::string, Rebalancer::ReplicaMove> MovesToSchedule;

    Status GetNextMovesImpl(std::vector<Rebalancer::ReplicaMove>* replica_moves) override;

    virtual Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
                                   const rebalance::ClusterRawInfo& raw_info,
                                   std::vector<Rebalancer::ReplicaMove>* replica_moves) = 0;

    bool FindNextMove(Rebalancer::ReplicaMove* move);

    // Update the helper containers once a move operation has been scheduled.
    void UpdateOnMoveScheduled(Rebalancer::ReplicaMove move);

    // Moves yet to schedule.
    MovesToSchedule moves_to_schedule_;
  };

  class PolicyFixer : public ReplaceBasedRunner {
   public:
    PolicyFixer(RebalancerTool* rebalancer,
                std::unordered_set<std::string> ignored_tservers,
                size_t max_moves_per_server,
                std::optional<MonoTime> deadline);
   private:
   // Get replica moves to restore the placement policy restrictions.
   // If returns Status::OK() with replica_moves empty, the distribution
   // of tablet relicas is considered conform the main constraint of the
   // placement policy.
    Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
                           const rebalance::ClusterRawInfo& raw_info,
                           std::vector<Rebalancer::ReplicaMove>* replica_moves) override;
  };

  class IgnoredTserversRunner : public ReplaceBasedRunner {
   public:
    IgnoredTserversRunner(RebalancerTool* rebalancer,
                          std::unordered_set<std::string> ignored_tservers,
                          size_t max_moves_per_server,
                          std::optional<MonoTime> deadline);

   private:
    // Key is tserver UUID which corresponds to value.ts_uuid_from.
    typedef std::unordered_multimap<std::string, Rebalancer::ReplicaMove> MovesToSchedule;

    // Get replica moves to move replicas from healthy ignored tservers.
    // If returns Status::OK() with replica_moves empty, there would be
    // no replica on the healthy ignored tservers.
    Status GetReplaceMoves(const rebalance::ClusterInfo& ci,
                           const rebalance::ClusterRawInfo& raw_info,
                           std::vector<Rebalancer::ReplicaMove>* replica_moves) override;

    // Return Status::OK() if it's safe to move all replicas from the ignored to other servers
    // and all tservers that need to empty are in maintenance mode.
    static Status CheckIgnoredTServers(const rebalance::ClusterRawInfo& raw_info,
                                       const rebalance::ClusterInfo& ci,
                                       const std::unordered_set<std::string>& tservers_to_empty);

    void GetMovesFromIgnoredTservers(const TServersToEmptyMap& ignored_tservers_info,
                                     std::vector<Rebalancer::ReplicaMove>* replica_moves);

    // Random device and generator for selecting among multiple choices, when appropriate.
    std::random_device random_device_;
    std::mt19937 random_generator_;
  };

  // Convert ksck results into information relevant to rebalancing the cluster
  // at the location specified by 'location' parameter ('std::nullopt' for
  // 'location' means that's about cross-location rebalancing). Basically,
  // 'raw' information is just a sub-set of relevant fields of the KsckResults
  // structure filtered to contain information only for the specified location.
  Status KsckResultsToClusterRawInfo(const std::optional<std::string>& location,
                                     const KsckResults& ksck_info,
                                     rebalance::ClusterRawInfo* raw_info);

  // Print replica count infomation about tservers need to empty.
  Status PrintIgnoredTserversStats(const rebalance::ClusterRawInfo& raw_info,
                                   std::ostream& out) const;

  // Print information on the cross-location balance.
  Status PrintCrossLocationBalanceStats(const rebalance::ClusterInfo& ci,
                                        std::ostream& out) const;

  // Print statistics for the specified location. If 'location' is an empty
  // string, that's about printing the cluster-wide stats for a cluster that
  // doesn't have any locations defined.
  Status PrintLocationBalanceStats(const std::string& location,
                                   const rebalance::ClusterRawInfo& raw_info,
                                   const rebalance::ClusterInfo& ci,
                                   std::ostream& out) const;

  Status PrintPolicyViolationInfo(const rebalance::ClusterRawInfo& raw_info,
                                  std::ostream& out) const;

  // Run rebalancing using the specified runner.
  Status RunWith(Runner* runner, RunStatus* result_status);

  // Refresh the information on the cluster for the specified location
  // (involves running ksck).
  Status GetClusterRawInfo(const std::optional<std::string>& location,
                           rebalance::ClusterRawInfo* raw_info);

  // Reset ksck-related fields and run ksck against the cluster.
  Status RefreshKsckResults();

  // Auxiliary Ksck object to get information on the cluster.
  std::unique_ptr<Ksck> ksck_;    // protected by ksck_lock_
  rw_spinlock ksck_lock_;

  bool ksck_refreshing_{false};   // protected by ksck_refresh_lock_
  Status ksck_refresh_status_;    // protected by ksck_refresh_lock_
  std::mutex ksck_refresh_lock_;
  std::condition_variable ksck_refresh_cv_;
};

} // namespace tools
} // namespace kudu
