// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/tablet_copy_source_session.h"

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <optional>
#include <ostream>
#include <type_traits>
#include <vector>

#include <gflags/gflags.h>

#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/data_dirs.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/basictypes.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/type_traits.h"
#include "kudu/rpc/transfer.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/monotime.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/trace.h"

DEFINE_int32(tablet_copy_transfer_chunk_size_bytes, 4 * 1024 * 1024,
             "Size of chunks to transfer when copying tablets between "
             "tablet servers.");
TAG_FLAG(tablet_copy_transfer_chunk_size_bytes, hidden);

METRIC_DEFINE_counter(server, tablet_copy_bytes_sent,
                      "Bytes Sent For Tablet Copy",
                      kudu::MetricUnit::kBytes,
                      "Number of bytes sent during tablet copy operations since server start",
                      kudu::MetricLevel::kDebug);

METRIC_DEFINE_gauge_int32(server, tablet_copy_open_source_sessions,
                          "Open Table Copy Source Sessions",
                          kudu::MetricUnit::kSessions,
                          "Number of currently open tablet copy source sessions on this server",
                          kudu::MetricLevel::kInfo);

DEFINE_int32(tablet_copy_session_inject_latency_on_init_ms, 0,
             "How much latency (in ms) to inject when a tablet copy session is initialized. "
             "(For testing only!)");
TAG_FLAG(tablet_copy_session_inject_latency_on_init_ms, unsafe);
TAG_FLAG(tablet_copy_session_inject_latency_on_init_ms, hidden);

namespace kudu {
namespace tserver {

using consensus::MinimumOpId;
using consensus::OpId;
using fs::ReadableBlock;
using log::ReadableLogSegment;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
using tablet::TabletMetadata;
using tablet::TabletReplica;

TabletCopySourceMetrics::TabletCopySourceMetrics(const scoped_refptr<MetricEntity>& metric_entity)
    : bytes_sent(METRIC_tablet_copy_bytes_sent.Instantiate(metric_entity)),
      open_source_sessions(METRIC_tablet_copy_open_source_sessions.Instantiate(metric_entity, 0)) {
}

TabletCopySourceSession::TabletCopySourceSession(
    string tablet_id,
    FsManager* fs_manager,
    TabletCopySourceMetrics* tablet_copy_metrics)
    : tablet_id_(std::move(tablet_id)),
      fs_manager_(fs_manager),
      blocks_deleter_(&blocks_),
      logs_deleter_(&logs_),
      tablet_copy_metrics_(tablet_copy_metrics) {
  if (tablet_copy_metrics_) {
    tablet_copy_metrics_->open_source_sessions->Increment();
  }
}

TabletCopySourceSession::~TabletCopySourceSession() {
  if (tablet_copy_metrics_) {
    tablet_copy_metrics_->open_source_sessions->IncrementBy(-1);
  }
}

Status TabletCopySourceSession::Init() {
  return init_once_.Init(&TabletCopySourceSession::InitOnce, this);
}

Status RemoteTabletCopySourceSession::InitOnce() {
  // Inject latency during Init() for testing purposes.
  if (PREDICT_FALSE(FLAGS_tablet_copy_session_inject_latency_on_init_ms > 0)) {
    TRACE("Injecting $0ms of latency due to --tablet_copy_session_inject_latency_on_init_ms",
          FLAGS_tablet_copy_session_inject_latency_on_init_ms);
    SleepFor(MonoDelta::FromMilliseconds(FLAGS_tablet_copy_session_inject_latency_on_init_ms));
  }

  RETURN_NOT_OK(tablet_replica_->CheckRunning());
  RETURN_NOT_OK(CheckHealthyDirGroup());

  // Prevent log GC while we grab log segments and Tablet metadata.
  string anchor_owner_token = Substitute("TabletCopy-$0", session_id_);
  tablet_replica_->log_anchor_registry()->Register(
      MinimumOpId().index(), anchor_owner_token, &log_anchor_);

  // Read the SuperBlock from disk.
  const scoped_refptr<TabletMetadata>& metadata = tablet_replica_->tablet_metadata();
  RETURN_NOT_OK_PREPEND(metadata->ReadSuperBlockFromDisk(&tablet_superblock_),
                        Substitute("Unable to access superblock for tablet $0",
                                   tablet_id_));

  // Anchor the data blocks by opening them and adding them to the cache.
  //
  // All subsequent requests should reuse the opened blocks.
  vector<BlockIdPB> data_blocks =
      TabletMetadata::CollectBlockIdPBs(tablet_superblock_);
  for (const BlockIdPB& block_id : data_blocks) {
    VLOG(1) << "Opening block " << pb_util::SecureDebugString(block_id);
    RETURN_NOT_OK(OpenBlock(BlockId::FromPB(block_id)));
  }

  // Get the latest opid in the log at this point in time so we can re-anchor.
  // TODO(mpercy): Do we need special handling for std::nullopt case?
  std::optional<OpId> last_logged_opid =
      tablet_replica_->consensus()->GetLastOpId(consensus::RECEIVED_OPID);
  if (!last_logged_opid) {
    last_logged_opid = MinimumOpId();
  }

  // Get the current segments from the log, including the active segment.
  // The Log doesn't add the active segment to the log reader's list until
  // a header has been written to it (but it will not have a footer).
  shared_ptr<log::LogReader> reader = tablet_replica_->log()->reader();
  if (!reader) {
    tablet::TabletStatePB tablet_state = tablet_replica_->state();
    return Status::IllegalState(Substitute(
        "Unable to initialize tablet copy session for tablet $0. "
        "Log reader is not available. Tablet state: $1 ($2)",
        tablet_id_, tablet::TabletStatePB_Name(tablet_state), tablet_state));
  }
  reader->GetSegmentsSnapshot(&log_segments_);
  for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) {
    RETURN_NOT_OK(OpenLogSegment(segment->header().sequence_number()));
  }

  // Look up the committed consensus state.
  // We do this after snapshotting the log to avoid a scenario where the latest
  // entry in the log has a term higher than the term stored in the consensus
  // metadata, which will results in a CHECK failure on RaftConsensus init.
  shared_ptr<consensus::RaftConsensus> consensus = tablet_replica_->shared_consensus();
  if (!consensus) {
    tablet::TabletStatePB tablet_state = tablet_replica_->state();
    return Status::IllegalState(Substitute(
        "Unable to initialize tablet copy session for tablet $0. "
        "Raft Consensus is not available. Tablet state: $1 ($2)",
        tablet_id_, tablet::TabletStatePB_Name(tablet_state), tablet_state));
  }
  RETURN_NOT_OK_PREPEND(consensus->ConsensusState(&initial_cstate_),
                        "Consensus state not available");

  // Re-anchor on the highest OpId that was in the log right before we
  // snapshotted the log segments. This helps ensure that we don't end up in a
  // tablet copy loop due to a follower falling too far behind the
  // leader's log when tablet copy is slow. The remote controls when
  // this anchor is released by ending the tablet copy session.
  RETURN_NOT_OK(tablet_replica_->log_anchor_registry()->RegisterOrUpdate(
      last_logged_opid->index(), anchor_owner_token, &log_anchor_));

  LOG(INFO) << Substitute(
      "T $0 P $1: Tablet Copy: opened $2 blocks and $3 log segments",
      tablet_id_, consensus->peer_uuid(), data_blocks.size(), log_segments_.size());
  return Status::OK();
}

// Determine the length of the data chunk to return to the client.
static int64_t DetermineReadLength(int64_t bytes_remaining, int64_t requested_len) {
  // Overhead in the RPC for things like headers, protobuf data, etc.
  static const int kOverhead = 4096;

  if (requested_len <= 0) {
    requested_len = FLAGS_tablet_copy_transfer_chunk_size_bytes;
  } else {
    requested_len = std::min<int64_t>(requested_len,
                                      FLAGS_tablet_copy_transfer_chunk_size_bytes);
  }
  requested_len = std::min<int64_t>(requested_len, FLAGS_rpc_max_message_size - kOverhead);
  CHECK_GT(requested_len, 0) << "rpc_max_message_size is too low to transfer data: "
                                     << FLAGS_rpc_max_message_size;
  return std::min(bytes_remaining, requested_len);
}

// Calculate the size of the data to return given a maximum client message
// length, the file itself, and the offset into the file to be read from.
static Status GetResponseDataSize(int64_t total_size,
                                  uint64_t offset, int64_t client_maxlen,
                                  TabletCopyErrorPB::Code* error_code, int64_t* data_size) {
  // If requested offset is off the end of the data, bail.
  if (offset >= total_size) {
    *error_code = TabletCopyErrorPB::INVALID_TABLET_COPY_REQUEST;
    return Status::InvalidArgument(
        Substitute("Requested offset ($0) is beyond the data size ($1)",
                   offset, total_size));
  }

  int64_t bytes_remaining = total_size - offset;

  *data_size = DetermineReadLength(bytes_remaining, client_maxlen);
  DCHECK_GT(*data_size, 0);
  if (client_maxlen > 0) {
    DCHECK_LE(*data_size, client_maxlen);
  }

  return Status::OK();
}

// Read a chunk of a file into a buffer.
// data_name provides a string for the block/log to be used in error messages.
template <class Info>
static Status ReadFileChunkToBuf(const Info* info,
                                 uint64_t offset, int64_t client_maxlen,
                                 const string& data_name,
                                 string* data, int64_t* file_size,
                                 TabletCopyErrorPB::Code* error_code) {
  int64_t response_data_size = 0;
  RETURN_NOT_OK_PREPEND(GetResponseDataSize(info->size, offset, client_maxlen, error_code,
                                            &response_data_size),
                        Substitute("Error reading $0", data_name));

  Stopwatch chunk_timer(Stopwatch::THIS_THREAD);
  chunk_timer.start();

  // Writing into a std::string buffer is basically guaranteed to work on C++11,
  // however any modern compiler should be compatible with it.
  // Violates the API contract, but avoids excessive copies.
  data->resize(response_data_size);
  uint8_t* buf = reinterpret_cast<uint8_t*>(const_cast<char*>(data->data()));
  Slice slice(buf, response_data_size);
  Status s = info->Read(offset, slice);
  if (PREDICT_FALSE(!s.ok())) {
    s = s.CloneAndPrepend(
        Substitute("Unable to read existing file for $0", data_name));
    LOG(WARNING) << s.ToString();
    *error_code = TabletCopyErrorPB::IO_ERROR;
    return s;
  }
  // Figure out if Slice points to buf or if Slice points to the mmap.
  // If it points to the mmap then copy into buf.
  if (slice.data() != buf) {
    memcpy(buf, slice.data(), slice.size());
  }
  chunk_timer.stop();
  TRACE("Tablet Copy: $0: $1 total bytes read. Total time elapsed: $2",
        data_name, response_data_size, chunk_timer.elapsed().ToString());

  *file_size = info->size;
  return Status::OK();
}

Status TabletCopySourceSession::GetBlockPiece(const BlockId& block_id,
                                             uint64_t offset, int64_t client_maxlen,
                                             string* data, int64_t* block_file_size,
                                             TabletCopyErrorPB::Code* error_code) {
  DCHECK(init_once_.init_succeeded());
  RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(error_code),
                        "Tablet copy source could not get block");
  ImmutableReadableBlockInfo* block_info;
  RETURN_NOT_OK(FindBlock(block_id, &block_info, error_code));

  RETURN_NOT_OK(ReadFileChunkToBuf(block_info, offset, client_maxlen,
                                   Substitute("block $0", block_id.ToString()),
                                   data, block_file_size, error_code));

  // Note: We do not eagerly close the block, as doing so may delete the
  // underlying data if this was its last reader and it had been previously
  // marked for deletion. This would be a problem for parallel readers in
  // the same session; they would not be able to find the block.

  return Status::OK();
}

Status TabletCopySourceSession::GetLogSegmentPiece(uint64_t segment_seqno,
                                                   uint64_t offset, int64_t client_maxlen,
                                                   string* data, int64_t* log_file_size,
                                                   TabletCopyErrorPB::Code* error_code) {
  DCHECK(init_once_.init_succeeded());
  RETURN_NOT_OK_PREPEND(CheckHealthyDirGroup(error_code),
                        "Tablet copy source could not get log segment");
  ImmutableRWFileInfo* file_info = nullptr;
  RETURN_NOT_OK(FindLogSegment(segment_seqno, &file_info, error_code));
  CHECK(file_info);
  const auto& kHeaderSize = file_info->readable->GetEncryptionHeaderSize();
  // To make sure unencrypted tservers can copy tablets from encrypted tservers
  // and vice versa (which is a valid scenario when encrypting an existing
  // cluster in place), we add the encryption header size here and start at 0 at
  // the client side.
  RETURN_NOT_OK(ReadFileChunkToBuf(file_info, offset + kHeaderSize, client_maxlen,
                                   Substitute("log segment $0", segment_seqno),
                                   data, log_file_size, error_code));
  *log_file_size -= kHeaderSize;

  // Note: We do not eagerly close log segment files, since we share ownership
  // of the LogSegment objects with the Log itself.
  return Status::OK();
}

bool TabletCopySourceSession::IsBlockOpenForTests(const BlockId& block_id) const {
  DCHECK(init_once_.init_succeeded());
  return ContainsKey(blocks_, block_id);
}

// Add a file to the cache and populate the given ImmutableRandomAcccessFileInfo
// object with the file ref and size.
template <class Collection, class Key, class Readable>
static Status AddImmutableFileToMap(Collection* const cache,
                                    const Key& key,
                                    const Readable& readable,
                                    uint64_t size) {
  // Sanity check for 0-length files.
  if (size == 0) {
    return Status::Corruption("Found 0-length object");
  }

  // Looks good, add it to the cache.
  typedef typename Collection::mapped_type InfoPtr;
  typedef typename base::remove_pointer<InfoPtr>::type Info;
  InsertOrDie(cache, key, new Info(readable, size));

  return Status::OK();
}

Status TabletCopySourceSession::CheckHealthyDirGroup(TabletCopyErrorPB::Code* error_code) const {
  if (fs_manager_->dd_manager()->IsTabletInFailedDir(tablet_id_)) {
    if (error_code) {
      *error_code = TabletCopyErrorPB::IO_ERROR;
    }
    return Status::IOError(
        Substitute("Tablet $0 is in a failed directory", tablet_id_));
  }
  return Status::OK();
}

Status TabletCopySourceSession::OpenBlock(const BlockId& block_id) {
  unique_ptr<ReadableBlock> block;
  Status s = fs_manager_->OpenBlock(block_id, &block);
  if (PREDICT_FALSE(!s.ok())) {
    LOG(WARNING) << "Unable to open requested (existing) block file: "
                 << block_id.ToString() << ": " << s.ToString();
    return s.CloneAndPrepend(Substitute("Unable to open block file for block $0",
                                        block_id.ToString()));
  }

  uint64_t size;
  s = block->Size(&size);
  if (PREDICT_FALSE(!s.ok())) {
    return s.CloneAndPrepend("Unable to get size of block");
  }

  s = AddImmutableFileToMap(&blocks_, block_id, block.get(), size);
  if (!s.ok()) {
    s = s.CloneAndPrepend(Substitute("Error accessing data for block $0", block_id.ToString()));
    LOG(DFATAL) << "Data block disappeared: " << s.ToString();
  } else {
    ignore_result(block.release());
  }
  return s;
}

Status TabletCopySourceSession::FindBlock(const BlockId& block_id,
                                         ImmutableReadableBlockInfo** block_info,
                                         TabletCopyErrorPB::Code* error_code) {
  if (!FindCopy(blocks_, block_id, block_info)) {
    *error_code = TabletCopyErrorPB::BLOCK_NOT_FOUND;
    return Status::NotFound("Block not found", block_id.ToString());
  }
  return Status::OK();
}

Status TabletCopySourceSession::OpenLogSegment(uint64_t segment_seqno) {
  scoped_refptr<log::ReadableLogSegment> log_segment;
  int position = -1;
  if (!log_segments_.empty()) {
    position = segment_seqno - log_segments_[0]->header().sequence_number();
  }
  if (position < 0 || position >= log_segments_.size()) {
    return Status::NotFound(Substitute("Segment with sequence number $0 not found",
                                       segment_seqno));
  }
  log_segment = log_segments_[position];
  CHECK_EQ(log_segment->header().sequence_number(), segment_seqno);

  uint64_t size = log_segment->readable_up_to();
  Status s = AddImmutableFileToMap(&logs_, segment_seqno, log_segment->file(), size);
  if (!s.ok()) {
    s = s.CloneAndPrepend(
            Substitute("Error accessing data for log segment with seqno $0",
                       segment_seqno));
    LOG(INFO) << s.ToString();
  }
  return s;
}

Status TabletCopySourceSession::FindLogSegment(uint64_t segment_seqno,
                                              ImmutableRWFileInfo** file_info,
                                              TabletCopyErrorPB::Code* error_code) {
  if (!FindCopy(logs_, segment_seqno, file_info)) {
    *error_code = TabletCopyErrorPB::WAL_SEGMENT_NOT_FOUND;
    return Status::NotFound(Substitute("Segment with sequence number $0 not found",
                                       segment_seqno));
  }
  return Status::OK();
}

RemoteTabletCopySourceSession::RemoteTabletCopySourceSession(
    scoped_refptr<TabletReplica> tablet_replica,
    string session_id,
    string requestor_uuid,
    FsManager* fs_manager,
    TabletCopySourceMetrics* tablet_copy_metrics)
    : TabletCopySourceSession(tablet_replica->tablet_id(),
                              fs_manager,
                              tablet_copy_metrics),
      tablet_replica_(std::move(tablet_replica)),
      session_id_(std::move(session_id)),
      requestor_uuid_(std::move(requestor_uuid)) {
}

RemoteTabletCopySourceSession::~RemoteTabletCopySourceSession() {
  // No lock taken in the destructor, should only be 1 thread with access now.
  CHECK_OK(UnregisterAnchorIfNeededUnlocked());
}

Status RemoteTabletCopySourceSession::UnregisterAnchorIfNeededUnlocked() {
  return tablet_replica_->log_anchor_registry()->UnregisterIfAnchored(&log_anchor_);
}

LocalTabletCopySourceSession::LocalTabletCopySourceSession(
    string tablet_id,
    FsManager* fs_manager,
    TabletCopySourceMetrics* tablet_copy_metrics)
    : TabletCopySourceSession(std::move(tablet_id),
                              fs_manager,
                              tablet_copy_metrics) {
}

Status LocalTabletCopySourceSession::InitOnce() {
  RETURN_NOT_OK(CheckHealthyDirGroup());

  // Read the SuperBlock from disk.
  string path = fs_manager_->GetTabletMetadataPath(tablet_id_);
  RETURN_NOT_OK_PREPEND(
      pb_util::ReadPBContainerFromPath(fs_manager_->env(), path, &tablet_superblock_,
                                       pb_util::SENSITIVE),
      Substitute("Unable to access superblock for tablet $0", tablet_id_));

  // Open the data blocks and add them to the cache.
  // NOTE: Since the server is stopped, it's not needed to anchor either
  // data blocks or log segments.
  //
  // All subsequent requests should reuse the opened blocks.
  vector<BlockIdPB> data_blocks =
      TabletMetadata::CollectBlockIdPBs(tablet_superblock_);
  for (const BlockIdPB& block_id : data_blocks) {
    VLOG(1) << "Opening block " << pb_util::SecureDebugString(block_id);
    RETURN_NOT_OK(OpenBlock(BlockId::FromPB(block_id)));
  }

  // Get the current segments from the log.
  shared_ptr<log::LogReader> reader;
  RETURN_NOT_OK(log::LogReader::Open(fs_manager_,
                                     /*index*/nullptr,
                                     tablet_id_,
                                     /*metric_entity*/nullptr,
                                     /*file_cache*/nullptr,
                                     &reader));
  reader->GetSegmentsSnapshot(&log_segments_);
  for (const scoped_refptr<ReadableLogSegment>& segment : log_segments_) {
    RETURN_NOT_OK(OpenLogSegment(segment->header().sequence_number()));
  }

  // Load the current consensus state.
  scoped_refptr<consensus::ConsensusMetadataManager> cmeta_mgr(
      new consensus::ConsensusMetadataManager(fs_manager_));
  scoped_refptr<consensus::ConsensusMetadata> cmeta;
  RETURN_NOT_OK(cmeta_mgr->Load(tablet_id_, &cmeta));
  initial_cstate_ = cmeta->ToConsensusStatePB();

  LOG(INFO) << Substitute(
      "T $0: Tablet Copy: opened $1 blocks and $2 log segments",
      tablet_id_, data_blocks.size(), log_segments_.size());
  return Status::OK();
}

} // namespace tserver
} // namespace kudu
