// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdint>
#include <functional>
#include <iostream>
#include <limits>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/client/client.h"
#include "kudu/client/row_result.h"
#include "kudu/client/scan_batch.h"
#include "kudu/client/scanner-internal.h"
#include "kudu/client/schema.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/partition.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus.proxy.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/split.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/server/server_base.pb.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/tools/tool_action.h"
#include "kudu/tools/tool_action_common.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/tserver/tserver_admin.pb.h"
#include "kudu/tserver/tserver_admin.proxy.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/status.h"

DEFINE_bool(force_copy, false,
            "Force the copy when the destination tablet server has this replica");
DEFINE_bool(include_schema, true,
            "Whether to include the schema of each replica");
DECLARE_string(table_name);
DECLARE_string(tablets);
DECLARE_int64(timeout_ms); // defined in ksck

using kudu::client::KuduRowResult;
using kudu::client::KuduScanBatch;
using kudu::client::KuduSchema;
using kudu::consensus::ConsensusServiceProxy;
using kudu::consensus::RaftConfigPB;
using kudu::consensus::RaftPeerPB;
using kudu::consensus::StartTabletCopyRequestPB;
using kudu::consensus::StartTabletCopyResponsePB;
using kudu::rpc::RpcController;
using kudu::server::ServerStatusPB;
using kudu::tablet::TabletStatusPB;
using kudu::tserver::DeleteTabletRequestPB;
using kudu::tserver::DeleteTabletResponsePB;
using kudu::tserver::ListTabletsRequestPB;
using kudu::tserver::ListTabletsResponsePB;
using kudu::tserver::NewScanRequestPB;
using kudu::tserver::ScanRequestPB;
using kudu::tserver::ScanResponsePB;
using kudu::tserver::TabletServer;
using kudu::tserver::TabletServerAdminServiceProxy;
using kudu::tserver::TabletServerErrorPB;
using kudu::tserver::TabletServerServiceProxy;
using std::cerr;
using std::cout;
using std::endl;
using std::string;
using std::unique_ptr;
using std::unordered_set;
using std::vector;

namespace kudu {
namespace tools {

// This class only exists so that Dump() can easily be friended with
// KuduSchema and KuduScanBatch.
class ReplicaDumper {
 public:
  static Status Dump(const Schema& schema,
                     const string& tablet_id,
                     TabletServerServiceProxy* proxy) {
    KuduSchema client_schema(schema);

    ScanRequestPB req;
    ScanResponsePB resp;

    // Scan and dump the tablet.
    // Note that we do a READ_LATEST scan as we might be scanning a tablet who lost majority
    // and thus cannot do snapshot scans.
    // TODO(dalves) When KUDU-1704 is in change this to perform stale snapshot reads, which
    // can be ordered.
    NewScanRequestPB* new_req = req.mutable_new_scan_request();
    RETURN_NOT_OK(SchemaToColumnPBs(
        schema, new_req->mutable_projected_columns(),
        SCHEMA_PB_WITHOUT_IDS | SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES));
    new_req->set_tablet_id(tablet_id);
    new_req->set_cache_blocks(false);

    do {
      RpcController rpc;
      rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
      RETURN_NOT_OK_PREPEND(proxy->Scan(req, &resp, &rpc), "Scan() failed");

      if (resp.has_error()) {
        return Status::IOError("Failed to read: ",
                               pb_util::SecureShortDebugString(resp.error()));
      }

      // The first response has a scanner ID. We use this for all subsequent
      // responses.
      if (resp.has_scanner_id()) {
        req.set_scanner_id(resp.scanner_id());
        req.clear_new_scan_request();
      }
      req.set_call_seq_id(req.call_seq_id() + 1);

      // Nothing to process from this scan result.
      if (!resp.has_data()) {
        continue;
      }

      KuduScanBatch::Data results;
      RETURN_NOT_OK(results.Reset(&rpc,
                                  &schema,
                                  &client_schema,
                                  client::KuduScanner::NO_FLAGS,
                                  &resp));
      vector<KuduRowResult> rows;
      results.ExtractRows(&rows);
      for (const auto& r : rows) {
        cout << r.ToString() << endl;
      }
    } while (resp.has_more_results());
    return Status::OK();
  }
};

namespace {

constexpr const char* const kReasonArg = "reason";
constexpr const char* const kSrcAddressArg = "src_address";
constexpr const char* const kDstAddressArg = "dst_address";
constexpr const char* const kPeerUUIDsArg = "peer uuids";
constexpr const char* const kPeerUUIDsArgDesc =
    "List of peer uuids to be part of new config, separated by whitespace";


Status GetReplicas(TabletServerServiceProxy* proxy,
                   vector<ListTabletsResponsePB::StatusAndSchemaPB>* replicas) {
  ListTabletsRequestPB req;
  ListTabletsResponsePB resp;
  RpcController rpc;
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));

  // Even with FLAGS_include_schema=false, don't set need_schema_info=false
  // in the request. The reason is that the schema is still needed to decode
  // the partition of each replica, and the partition information is pretty
  // much always nice to have.
  RETURN_NOT_OK(proxy->ListTablets(req, &resp, &rpc));
  if (resp.has_error()) {
    return StatusFromPB(resp.error().status());
  }

  replicas->assign(resp.status_and_schema().begin(),
                   resp.status_and_schema().end());
  return Status::OK();
}

Status CheckReplicas(const RunnerContext& context) {
  const string& address = FindOrDie(context.required_args, kTServerAddressArg);

  unique_ptr<TabletServerServiceProxy> proxy;
  RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
                           &proxy));

  vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
  RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));

  bool all_running = true;
  for (const auto& r : replicas) {
    const TabletStatusPB& rs = r.tablet_status();
    // It's ok if the tablet replica isn't running if it's tombstoned.
    if (rs.state() != tablet::RUNNING &&
        rs.tablet_data_state() != tablet::TABLET_DATA_TOMBSTONED) {
      cerr << "Tablet id: " << rs.tablet_id() << " is "
           << tablet::TabletStatePB_Name(rs.state()) << endl;
      all_running = false;
    }
  }

  if (all_running) {
    cout << "All tablet replicas are running" << endl;
    return Status::OK();
  }
  return Status::IllegalState("Not all tablet replicas are running");
}

Status DeleteReplica(const RunnerContext& context) {
  const string& address = FindOrDie(context.required_args, kTServerAddressArg);
  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
  const string& reason = FindOrDie(context.required_args, kReasonArg);

  ServerStatusPB status;
  RETURN_NOT_OK(GetServerStatus(address, tserver::TabletServer::kDefaultPort,
                                &status));

  unique_ptr<TabletServerAdminServiceProxy> proxy;
  RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
                           &proxy));

  DeleteTabletRequestPB req;
  DeleteTabletResponsePB resp;
  RpcController rpc;
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));

  req.set_tablet_id(tablet_id);
  req.set_dest_uuid(status.node_instance().permanent_uuid());
  req.set_reason(reason);
  req.set_delete_type(tablet::TABLET_DATA_TOMBSTONED);
  RETURN_NOT_OK_PREPEND(proxy->DeleteTablet(req, &resp, &rpc),
                        "DeleteTablet() failed");
  if (resp.has_error()) {
    return Status::IOError("Failed to delete tablet: ",
                           pb_util::SecureShortDebugString(resp.error()));
  }
  return Status::OK();
}

Status DumpReplica(const RunnerContext& context) {
  const string& address = FindOrDie(context.required_args, kTServerAddressArg);
  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);

  unique_ptr<TabletServerServiceProxy> proxy;
  RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
                           &proxy));

  vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
  RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));

  Schema schema;
  for (const auto& r : replicas) {
    if (r.tablet_status().tablet_id() == tablet_id) {
      RETURN_NOT_OK(SchemaFromPB(r.schema(), &schema));
      break;
    }
  }
  if (!schema.initialized()) {
    return Status::NotFound("cannot find replica", tablet_id);
  }
  return ReplicaDumper::Dump(schema, tablet_id, proxy.get());
}

Status ListReplicas(const RunnerContext& context) {
  const string& address = FindOrDie(context.required_args, kTServerAddressArg);
  unique_ptr<TabletServerServiceProxy> proxy;
  RETURN_NOT_OK(BuildProxy(address, tserver::TabletServer::kDefaultPort,
                           &proxy));

  vector<ListTabletsResponsePB::StatusAndSchemaPB> replicas;
  RETURN_NOT_OK(GetReplicas(proxy.get(), &replicas));

  unordered_set<string> tablet_ids = strings::Split(FLAGS_tablets, ",");
  for (const auto& r : replicas) {
    if (!FLAGS_table_name.empty() &&
        r.tablet_status().table_name() != FLAGS_table_name) {
      continue;
    }
    if (!FLAGS_tablets.empty() &&
        !ContainsKey(tablet_ids, r.tablet_status().tablet_id())) {
      continue;
    }
    Schema schema;
    RETURN_NOT_OK_PREPEND(
        SchemaFromPB(r.schema(), &schema),
        "Unable to deserialize schema from " + address);
    PartitionSchema partition_schema;
    RETURN_NOT_OK_PREPEND(
        PartitionSchema::FromPB(r.partition_schema(), schema, &partition_schema),
        "Unable to deserialize partition schema from " + address);

    const TabletStatusPB& rs = r.tablet_status();

    Partition partition;
    Partition::FromPB(rs.partition(), &partition);

    const string& state = tablet::TabletStatePB_Name(rs.state());
    cout << "Tablet id: " << rs.tablet_id() << endl;
    cout << "State: " << state << endl;
    cout << "Last status: " << rs.last_status() << endl;
    if (r.has_role()) {
      cout << "Role: " << RaftPeerPB::Role_Name(r.role()) << endl;
    }
    cout << "Table name: " << rs.table_name() << endl;
    cout << "Partition: "
         << partition_schema.PartitionDebugString(partition, schema) << endl;
    if (rs.has_estimated_on_disk_size()) {
      cout << "Estimated on disk size: "
           << HumanReadableNumBytes::ToString(rs.estimated_on_disk_size()) << endl;
    }
    const string& data_state = tablet::TabletDataState_Name(rs.tablet_data_state());
    cout << "Data state: " << data_state << endl;
    if (rs.data_dirs_size() != 0) {
      cout << "Data dirs: " << JoinStrings(rs.data_dirs(), ", ") << endl;
    } else {
      cout << "Data dirs: <not available>" << endl;
    }
    if (FLAGS_include_schema) {
      cout << "Schema: " << schema.ToString() << endl;
    }
  }

  return Status::OK();
}

Status CopyReplica(const RunnerContext& context) {
  const string& src_address = FindOrDie(context.required_args, kSrcAddressArg);
  const string& dst_address = FindOrDie(context.required_args, kDstAddressArg);
  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);

  ServerStatusPB dst_status;
  RETURN_NOT_OK(GetServerStatus(dst_address, TabletServer::kDefaultPort,
                                &dst_status));
  ServerStatusPB src_status;
  RETURN_NOT_OK(GetServerStatus(src_address, TabletServer::kDefaultPort,
                                &src_status));

  unique_ptr<ConsensusServiceProxy> proxy;
  RETURN_NOT_OK(BuildProxy(dst_address, TabletServer::kDefaultPort, &proxy));

  StartTabletCopyRequestPB req;
  StartTabletCopyResponsePB resp;
  RpcController rpc;
  req.set_dest_uuid(dst_status.node_instance().permanent_uuid());
  req.set_tablet_id(tablet_id);
  req.set_copy_peer_uuid(src_status.node_instance().permanent_uuid());
  *req.mutable_copy_peer_addr() = src_status.bound_rpc_addresses(0);
  // Provide a force option if the destination tablet server has the
  // replica otherwise tablet-copy will fail.
  if (FLAGS_force_copy) {
    req.set_caller_term(std::numeric_limits<int64_t>::max());
  }

  LOG(INFO) << "Sending copy replica request:\n" << pb_util::SecureDebugString(req);
  LOG(WARNING) << "NOTE: this copy may happen asynchronously "
               << "and may timeout if the tablet size is large. Watch the logs on "
               << "the target tablet server for indication of progress.";

  RETURN_NOT_OK(proxy->StartTabletCopy(req, &resp, &rpc));
  if (resp.has_error()) {
    RETURN_NOT_OK_PREPEND(
        StatusFromPB(resp.error().status()),
        strings::Substitute("Remote server returned error code $0",
                            TabletServerErrorPB::Code_Name(resp.error().code())));
  }
  return Status::OK();
}

Status UnsafeChangeConfig(const RunnerContext& context) {
  // Parse and validate arguments.
  const string& dst_address = FindOrDie(context.required_args, kTServerAddressArg);
  const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
  ServerStatusPB dst_status;
  RETURN_NOT_OK(GetServerStatus(dst_address, TabletServer::kDefaultPort,
                                &dst_status));

  if (context.variadic_args.empty()) {
    return Status::InvalidArgument("No peer UUIDs specified for the new config");
  }

  RaftConfigPB new_config;
  for (const auto& arg : context.variadic_args) {
    RaftPeerPB new_peer;
    new_peer.set_permanent_uuid(arg);
    new_config.add_peers()->CopyFrom(new_peer);
  }

  // Send a request to replace the config to node dst_address.
  unique_ptr<ConsensusServiceProxy> proxy;
  RETURN_NOT_OK(BuildProxy(dst_address, TabletServer::kDefaultPort, &proxy));
  consensus::UnsafeChangeConfigRequestPB req;
  consensus::UnsafeChangeConfigResponsePB resp;
  RpcController rpc;
  rpc.set_timeout(MonoDelta::FromMilliseconds(FLAGS_timeout_ms));
  req.set_dest_uuid(dst_status.node_instance().permanent_uuid());
  req.set_tablet_id(tablet_id);
  req.set_caller_id("kudu-tools");
  *req.mutable_new_config() = new_config;
  RETURN_NOT_OK(proxy->UnsafeChangeConfig(req, &resp, &rpc));
  if (resp.has_error()) {
    return StatusFromPB(resp.error().status());
  }
  return Status::OK();
}

} // anonymous namespace

unique_ptr<Mode> BuildRemoteReplicaMode() {
  unique_ptr<Action> check_replicas =
      TServerActionBuilder("check", &CheckReplicas)
      .Description("Check if all tablet replicas on a Kudu tablet server are "
                   "running. Tombstoned replica do not count as not running, "
                   "because they are just records of the previous existence of "
                   "a replica.")
      .Build();

  unique_ptr<Action> copy_replica =
      RpcActionBuilder("copy", &CopyReplica)
      .Description("Copy a tablet replica from one Kudu Tablet Server to another")
      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
      .AddRequiredParameter({ kSrcAddressArg, kTServerAddressDesc })
      .AddRequiredParameter({ kDstAddressArg, kTServerAddressDesc })
      .AddOptionalParameter("force_copy")
      .Build();

  unique_ptr<Action> delete_replica =
      TServerActionBuilder("delete", &DeleteReplica)
      .Description("Delete a tablet replica from a Kudu Tablet Server")
      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
      .AddRequiredParameter({ kReasonArg, "Reason for deleting the replica" })
      .Build();

  unique_ptr<Action> dump_replica =
      TServerActionBuilder("dump", &DumpReplica)
      .Description("Dump the data of a tablet replica on a Kudu Tablet Server")
      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
      .Build();

  unique_ptr<Action> list =
      TServerActionBuilder("list", &ListReplicas)
      .Description("List all tablet replicas on a Kudu Tablet Server")
      .AddOptionalParameter("include_schema")
      .AddOptionalParameter("table_name")
      .AddOptionalParameter("tablets",
                            string(""),
                            string("Comma-separated list of tablet IDs used to "
                                   "filter the list of replicas"))
      .Build();

  unique_ptr<Action> unsafe_change_config =
      TServerActionBuilder("unsafe_change_config", &UnsafeChangeConfig)
      .Description("Force the specified replica to adopt a new Raft config")
      .ExtraDescription("This tool is useful when a config change is "
                        "necessary because a tablet cannot make progress with "
                        "its current Raft configuration (e.g. to evict "
                        "followers when a majority is unavailable).\n\nNote: "
                        "The members of the new Raft config must be a subset "
                        "of (or the same as) the members of the existing "
                        "committed Raft config.")
      .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
      .AddRequiredVariadicParameter({ kPeerUUIDsArg, kPeerUUIDsArgDesc })
      .Build();

  return ModeBuilder("remote_replica")
      .Description("Operate on remote tablet replicas on a Kudu Tablet Server")
      .AddAction(std::move(check_replicas))
      .AddAction(std::move(copy_replica))
      .AddAction(std::move(delete_replica))
      .AddAction(std::move(dump_replica))
      .AddAction(std::move(list))
      .AddAction(std::move(unsafe_change_config))
      .Build();
}

} // namespace tools
} // namespace kudu

