// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <algorithm>
#include <cstdlib>
#include <functional>
#include <initializer_list>
#include <iostream>
#include <iterator>
#include <memory>
#include <optional>
#include <string>
#include <tuple>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rebalance/cluster_status.h"
#include "kudu/rebalance/rebalancer.h"
#include "kudu/tools/ksck.h"
#include "kudu/tools/ksck_remote.h"
#include "kudu/tools/ksck_results.h"
#include "kudu/tools/rebalancer_tool.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tools/tool_replica_util.h"
#include "kudu/util/status.h"
#include "kudu/util/string_case.h"
#include "kudu/util/version_util.h"

using kudu::rebalance::Rebalancer;
using kudu::iequals;
using std::cout;
using std::endl;
using std::make_shared;
using std::make_tuple;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Split;
using strings::Substitute;

DECLARE_string(tables);
DECLARE_string(tablets);

DEFINE_string(ignored_tservers, "",
              "UUIDs of tablet servers to ignore while rebalancing the cluster "
              "(comma-separated list). If specified, the tablet servers are "
              "effectively ignored by the rebalancer tool, they are not considered "
              "as a part of the cluster as well as the replicas on them. "
              "If not specified, the rebalancer tool will run on all the tablet servers "
              "in the cluster.");

DEFINE_string(sections, "*",
              "Sections to print (comma-separated list of sections, "
              "available sections are: MASTER_SUMMARIES, TSERVER_SUMMARIES, "
              "VERSION_SUMMARIES, TABLET_SUMMARIES, TABLE_SUMMARIES, "
              "SYSTEM_TABLE_SUMMARIES, CHECKSUM_RESULTS and TOTAL_COUNT.) "
              "If not specified, print all sections.");

DEFINE_uint32(max_moves_per_server, 5,
              "Maximum number of replica moves to perform concurrently on one "
              "tablet server: 'move from' and 'move to' are counted "
              "as separate move operations.");

DEFINE_uint32(max_staleness_interval_sec, 300,
              "Maximum duration of the 'staleness' interval, when the "
              "rebalancer cannot make any progress in scheduling new moves and "
              "no prior scheduled moves are left, even if re-synchronizing "
              "against the cluster's state again and again. Such a staleness "
              "usually happens in case of a persistent problem with the "
              "cluster or when some unexpected concurrent activity is "
              "present (such as automatic recovery of failed replicas, etc.)");

DEFINE_uint32(intra_location_rebalancing_concurrency, 0,
              "How many independent intra-location rebalancing sessions can be "
              "run in parallel. Since the location assignment naturally provides "
              "non-intersecting sets of servers, it's possible to "
              "independently move tablet replicas within different locations "
              "in parallel. Value of 0 means 'the number of CPU cores'. "
              "The actual number of concurrent sessions is the minimum of two "
              "values: this setting and the number of locations in a cluster.");

DEFINE_int64(max_run_time_sec, 0,
             "Maximum time to run the rebalancing, in seconds. Specifying 0 "
             "means not imposing any limit on the rebalancing run time.");

DEFINE_string(move_single_replicas, "auto",
              "Whether to move single replica tablets (i.e. replicas of tablets "
              "of replication factor 1). Acceptable values are: "
              "'auto', 'enabled', 'disabled'. The value of 'auto' means "
              "turn it on/off depending on the replica management scheme "
              "and Kudu version.");

DEFINE_bool(output_replica_distribution_details, false,
            "Whether to output details on per-table and per-server "
            "replica distribution");

DEFINE_bool(report_only, false,
            "Whether to report on table- and cluster-wide replica distribution "
            "skew and exit without doing any actual rebalancing");

DEFINE_bool(disable_policy_fixer, false,
            "In case of multi-location cluster, whether to detect and fix "
            "placement policy violations. Fixing placement policy violations "
            "involves moving tablet replicas across different locations "
            "of the cluster. "
            "This setting is applicable to multi-location clusters only.");

DEFINE_bool(disable_cross_location_rebalancing, false,
            "In case of multi-location cluster, whether to move tablet "
            "replicas between locations in attempt to spread tablet replicas "
            "among location evenly (equalizing loads of locations throughout "
            "the cluster). "
            "This setting is applicable to multi-location clusters only.");

DEFINE_bool(disable_intra_location_rebalancing, false,
            "In case of multi-location cluster, whether to rebalance tablet "
            "replica distribution within each location. "
            "This setting is applicable to multi-location clusters only.");

DEFINE_bool(enable_range_rebalancing, false,
            "Whether to enable table range rebalancing");

DEFINE_bool(move_replicas_from_ignored_tservers, false,
            "Whether to move replicas from the specified 'ignored_tservers' to other "
            "servers when the source tablet server is healthy. "
            "This setting is effective only if the '--ignored_tservers' flag "
            "is specified as well. "
            "If set true, then all ignored tablet servers must be placed into "
            "the 'maintenance mode'.");

DEFINE_double(load_imbalance_threshold,
              kudu::rebalance::Rebalancer::Config::kLoadImbalanceThreshold,
              "The threshold for the per-table location load imbalance. "
              "The threshold is used during the cross-location rebalancing "
              "phase. If the measured cross-location load imbalance for a "
              "table is greater than the specified threshold, the rebalancer "
              "tries to move table's replicas to reduce the imbalance. "
              "The recommended range for the threshold is [0.5, ...) with the "
              "default value of 1.0. The threshold represents a policy "
              "wrt what to prefer: either ideal balance of the cross-location "
              "load on per-table basis (lower threshold value) or minimum "
              "number of replica movements between locations "
              "(greater threshold value). The default value is empirically "
              "proven to be a good choice between 'ideal' and 'good enough' "
              "replica distributions.");

DEFINE_bool(force_rebalance_replicas_on_maintenance_tservers, false,
            "This flag only takes effect in the case that some tservers are set maintenance "
            "mode but not specified in 'ignored_tservers'. If set true, the tool would rebalance "
            "all known replicas among all known tservers. The side effect of this is new "
            "replicas may be moved to maintenance tservers which are likely to be restarted or "
            "decommissioned soon. It is generally more recommended to specify all maintenance "
            "tservers in 'ignored_tservers', so that the rebalancer tool would ignore these "
            "tservers' states and replicas on them when planning replica moves.");

static bool ValidateMoveSingleReplicas(const char* flag_name,
                                       const string& flag_value) {
  const vector<string> allowed_values = { "auto", "enabled", "disabled" };
  if (std::find_if(allowed_values.begin(), allowed_values.end(),
                   [&](const string& allowed_value) {
                     return iequals(allowed_value, flag_value);
                   }) != allowed_values.end()) {
    return true;
  }

  std::ostringstream ss;
  ss << "'" << flag_value << "': unsupported value for --" << flag_name
     << " flag; should be one of ";
  copy(allowed_values.begin(), allowed_values.end(),
       std::ostream_iterator<string>(ss, " "));
  LOG(ERROR) << ss.str();

  return false;
}
DEFINE_validator(move_single_replicas, &ValidateMoveSingleReplicas);

namespace kudu {
namespace tools {

namespace {

Status RunKsck(const RunnerContext& context) {
  vector<string> master_addresses;
  RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
  shared_ptr<KsckCluster> cluster;
  RETURN_NOT_OK_PREPEND(RemoteKsckCluster::Build(master_addresses, &cluster),
                        "unable to build KsckCluster");
  cluster->set_table_filters(Split(FLAGS_tables, ",", strings::SkipEmpty()));
  cluster->set_tablet_id_filters(Split(FLAGS_tablets, ",", strings::SkipEmpty()));
  auto ksck(make_shared<Ksck>(cluster));

  ksck->set_print_sections(Split(FLAGS_sections, ",", strings::SkipEmpty()));

  return ksck->RunAndPrintResults();
}

// Does the version in 'version_str' support movement of single replicas?
// The minimum version required is 1.7.1.
bool VersionSupportsRF1Movement(const string& version_str) {
  Version v;
  if (!ParseVersion(version_str, &v).ok()) {
    return false;
  }
  return make_tuple(v.major, v.minor, v.maintenance) >= make_tuple(1, 7, 1);
}

// Whether it make sense to move replicas of single-replica tablets.
// The output parameter 'move_single_replicas' cannot be null.
//
// Moving replicas of tablets with replication factor one (a.k.a. non-replicated
// tablets) is tricky because of the following:
//
//   * The sequence of Raft configuration updates when moving tablet replica
//     in case of the 3-2-3 replica management scheme.
//
//   * KUDU-2443: even with the 3-4-3 replica management scheme, moving of
//     non-replicated tablets is not possible for the versions prior to the fix.
//
Status EvaluateMoveSingleReplicasFlag(const vector<string>& master_addresses,
                                      bool* move_single_replicas) {
  DCHECK(move_single_replicas);
  if (!iequals(FLAGS_move_single_replicas, "auto")) {
    if (iequals(FLAGS_move_single_replicas, "enabled")) {
      *move_single_replicas = true;
    } else {
      DCHECK(iequals(FLAGS_move_single_replicas, "disabled"));
      *move_single_replicas = false;
    }
    return Status::OK();
  }

  shared_ptr<KsckCluster> cluster;
  RETURN_NOT_OK_PREPEND(RemoteKsckCluster::Build(master_addresses, &cluster),
                        "unable to build KsckCluster");
  auto ksck(make_shared<Ksck>(cluster));

  // Ignoring the result of the Ksck::Run() method: it's possible the cluster
  // is not completely healthy but rebalancing can proceed; for example,
  // if a leader election is occurring.
  ignore_result(ksck->Run());
  const auto& ksck_results = ksck->results();

  for (const auto& summaries : { ksck_results.cluster_status.tserver_summaries,
                                 ksck_results.cluster_status.master_summaries }) {
    for (const auto& summary : summaries) {
      if (summary.version) {
        if (!VersionSupportsRF1Movement(*summary.version)) {
          LOG(INFO) << "found Kudu server of version '" << *summary.version
                    << "'; not rebalancing single-replica tablets as a result";
          *move_single_replicas = false;
          return Status::OK();
        }
      } else {
        LOG(INFO) << "no version information from some servers; "
                  << "not rebalancing single-replica tablets as the result";
        *move_single_replicas = false;
        return Status::OK();
      }
    }
  }

  // Now check for the replica management scheme. If it's the 3-2-3 scheme,
  // don't move replicas of non-replicated (a.k.a. RF=1) tablets. The reasoning
  // is simple: in Raft it's necessary to get acknowledgement from the majority
  // of voter replicas to commit a write operation. In case of the 3-2-3 scheme
  // and non-replicated tablets the majority is two out of two tablet replicas
  // because the destination replica is added as a voter. In case of huge amount
  // of data in the tablet or frequent updates, it might take a long time for the
  // destination replica to catch up. During that time the tablet would not be
  // available. The idea is to reduce the risk of unintended unavailability
  // unless it's explicitly requested by the operator.
  std::optional<string> tid;
  if (!ksck_results.cluster_status.tablet_summaries.empty()) {
    tid = ksck_results.cluster_status.tablet_summaries.front().id;
  }
  bool is_343_scheme = false;
  auto s = Is343SchemeCluster(master_addresses, tid, &is_343_scheme);
  if (!s.ok()) {
    LOG(WARNING) << s.ToString() << ": failed to get information "
        "on the replica management scheme; not rebalancing "
        "single-replica tablets as the result";
    *move_single_replicas = false;
    return Status::OK();
  }

  *move_single_replicas = is_343_scheme;
  return Status::OK();
}

// Rebalance the cluster. The process is run step-by-step, where at each step
// a new batch of move operations is output by the algorithm. As many as
// possible replica movements from one batch are performed concurrently, while
// running not more than the specified number of concurrent replica movements
// at each tablet server. In other words, at every moment a single tablet server
// can be the source and the destination of no more than the specified number of
// move operations.
Status RunRebalance(const RunnerContext& context) {
  const vector<string> ignored_tservers =
      Split(FLAGS_ignored_tservers, ",", strings::SkipEmpty());
  vector<string> master_addresses;
  RETURN_NOT_OK(ParseMasterAddresses(context, &master_addresses));
  const vector<string> table_filters =
      Split(FLAGS_tables, ",", strings::SkipEmpty());

  if (FLAGS_enable_range_rebalancing && table_filters.size() != 1) {
    return Status::NotSupported(
        "range rebalancing is currently implemented for a single table only: "
        "use '--tables' to specify a table for range rebalancing");
  }

  // Evaluate --move_single_replicas flag: decide whether enable to disable
  // moving of single-replica tablets based on the reported version of the
  // Kudu components.
  bool move_single_replicas = false;
  RETURN_NOT_OK(EvaluateMoveSingleReplicasFlag(master_addresses,
                                               &move_single_replicas));
  RebalancerTool rebalancer(Rebalancer::Config(
      ignored_tservers,
      master_addresses,
      table_filters,
      FLAGS_max_moves_per_server,
      FLAGS_max_staleness_interval_sec,
      FLAGS_max_run_time_sec,
      FLAGS_move_replicas_from_ignored_tservers,
      move_single_replicas,
      FLAGS_output_replica_distribution_details,
      !FLAGS_disable_policy_fixer,
      !FLAGS_disable_cross_location_rebalancing,
      !FLAGS_disable_intra_location_rebalancing,
      FLAGS_load_imbalance_threshold,
      FLAGS_force_rebalance_replicas_on_maintenance_tservers,
      FLAGS_intra_location_rebalancing_concurrency,
      FLAGS_enable_range_rebalancing));

  // Print info on pre-rebalance distribution of replicas.
  RETURN_NOT_OK(rebalancer.PrintStats(cout));

  if (FLAGS_report_only) {
    return Status::OK();
  }

  RebalancerTool::RunStatus result_status;
  size_t moves_count;
  RETURN_NOT_OK(rebalancer.Run(&result_status, &moves_count));

  const string msg_template = "rebalancing is complete: $0 (moved $1 replicas)";
  string msg_result_status;
  switch (result_status) {
    case RebalancerTool::RunStatus::CLUSTER_IS_BALANCED:
      msg_result_status = "cluster is balanced";
      break;
    case RebalancerTool::RunStatus::TIMED_OUT:
      msg_result_status = "time is up";
      break;
    default:
      msg_result_status = "unexpected rebalancer status";
      DCHECK(false) << msg_result_status;
      break;
  }
  cout << endl << Substitute(msg_template, msg_result_status, moves_count) << endl;

  if (moves_count != 0) {
    // Print info on post-rebalance distribution of replicas, if any moves
    // were performed at all.
    RETURN_NOT_OK(rebalancer.PrintStats(cout));
  }

  return Status::OK();
}

} // anonymous namespace

unique_ptr<Mode> BuildClusterMode() {
  ModeBuilder builder("cluster");
  builder.Description("Operate on a Kudu cluster");

  {
    constexpr auto desc = "Check the health of a Kudu cluster";
    constexpr auto extra_desc = "By default, ksck checks that master and "
        "tablet server processes are running, and that table metadata is "
        "consistent. Use the 'checksum' flag to check that tablet data is "
        "consistent (also see the 'tables' and 'tablets' flags). Use the "
        "'checksum_snapshot' along with 'checksum' if the table or tablets "
        "are actively receiving inserts or updates. Use the 'ksck_format' flag "
        "to output detailed information on cluster status even if no "
        "inconsistency is found in metadata.";

    unique_ptr<Action> ksck = ClusterActionBuilder("ksck", &RunKsck)
        .Description(desc)
        .ExtraDescription(extra_desc)
        .AddOptionalParameter("checksum_cache_blocks")
        .AddOptionalParameter("checksum_scan")
        .AddOptionalParameter("checksum_scan_concurrency")
        .AddOptionalParameter("checksum_snapshot")
        .AddOptionalParameter("checksum_timeout_sec")
        .AddOptionalParameter("color")
        .AddOptionalParameter("consensus")
        .AddOptionalParameter("fetch_info_concurrency")
        .AddOptionalParameter("flags_categories_to_check")
        .AddOptionalParameter("ksck_format")
        .AddOptionalParameter("quiescing_info")
        .AddOptionalParameter("sections")
        .AddOptionalParameter("tables")
        .AddOptionalParameter("tablets")
        .Build();
    builder.AddAction(std::move(ksck));
  }

  {
    constexpr auto desc = "Move tablet replicas between tablet servers to "
        "balance replica counts for each table and for the cluster as a whole.";
    constexpr auto extra_desc = "The rebalancing tool moves tablet replicas "
        "between tablet servers, in the same manner as the "
        "'kudu tablet change_config move_replica' command, attempting to "
        "balance the count of replicas per table on each tablet server, "
        "and after that attempting to balance the total number of replicas "
        "per tablet server.";
    unique_ptr<Action> rebalance = ClusterActionBuilder("rebalance", &RunRebalance)
        .Description(desc)
        .ExtraDescription(extra_desc)
        .AddOptionalParameter("disable_cross_location_rebalancing")
        .AddOptionalParameter("disable_intra_location_rebalancing")
        .AddOptionalParameter("disable_policy_fixer")
        .AddOptionalParameter("fetch_info_concurrency")
        .AddOptionalParameter("force_rebalance_replicas_on_maintenance_tservers")
        .AddOptionalParameter("ignored_tservers")
        .AddOptionalParameter("intra_location_rebalancing_concurrency")
        .AddOptionalParameter("load_imbalance_threshold")
        .AddOptionalParameter("max_moves_per_server")
        .AddOptionalParameter("max_run_time_sec")
        .AddOptionalParameter("max_staleness_interval_sec")
        .AddOptionalParameter("move_replicas_from_ignored_tservers")
        .AddOptionalParameter("move_single_replicas")
        .AddOptionalParameter("output_replica_distribution_details")
        .AddOptionalParameter("enable_range_rebalancing")
        .AddOptionalParameter("report_only")
        .AddOptionalParameter("tables")
        .Build();
    builder.AddAction(std::move(rebalance));
  }

  return builder.Build();
}

} // namespace tools
} // namespace kudu
