// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/tserver/tablet_copy_source_session.h"

#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>

#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/common/common.pb.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/row_operations.pb.h"
#include "kudu/common/schema.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus_meta_manager.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/fastmem.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/write_op.h"
#include "kudu/tablet/tablet-test-util.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/tablet_copy.pb.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/crc.h"
#include "kudu/util/env.h"
#include "kudu/util/faststring.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/dns_resolver.h"
#include "kudu/util/path_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/threadpool.h"

METRIC_DECLARE_entity(tablet);

using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;

namespace kudu {

class BlockIdPB;

namespace tserver {

using consensus::ConsensusMetadataManager;
using consensus::RaftConfigPB;
using consensus::RaftPeerPB;
using consensus::kMinimumTerm;
using fs::BlockDeletionTransaction;
using fs::ReadableBlock;
using log::Log;
using log::LogOptions;
using log::LogAnchorRegistry;
using rpc::Messenger;
using rpc::MessengerBuilder;
using strings::Substitute;
using tablet::ColumnDataPB;
using tablet::DeltaDataPB;
using tablet::KuduTabletTest;
using tablet::RowSetDataPB;
using tablet::TabletReplica;
using tablet::TabletSuperBlockPB;
using tablet::WriteOpState;

class TabletCopyTest : public KuduTabletTest,
                       public ::testing::WithParamInterface<TabletCopyMode> {
 public:
  TabletCopyTest()
      : KuduTabletTest(Schema({ ColumnSchema("key", STRING),
                                ColumnSchema("val", INT32) }, 1)),
        dns_resolver_(new DnsResolver) {
    CHECK_OK(ThreadPoolBuilder("prepare").Build(&prepare_pool_));
    CHECK_OK(ThreadPoolBuilder("apply").Build(&apply_pool_));
    CHECK_OK(ThreadPoolBuilder("raft").Build(&raft_pool_));
  }

  void SetUp() override {
    NO_FATALS(KuduTabletTest::SetUp());
    NO_FATALS(SetUpTabletReplica());
    NO_FATALS(PopulateTablet());
    NO_FATALS(InitSession());
  }

  void TearDown() override {
    session_.reset();
    tablet_replica_->Shutdown();
    KuduTabletTest::TearDown();
  }

 protected:
  void SetUpTabletReplica() {
    scoped_refptr<Log> log;
    ASSERT_OK(Log::Open(LogOptions(),
                        fs_manager(),
                        /*file_cache=*/ nullptr,
                        tablet()->tablet_id(),
                        *tablet()->schema(),
                        /*schema_version=*/ 0,
                        /*metric_entity=*/ nullptr,
                        &log));

    scoped_refptr<MetricEntity> metric_entity =
      METRIC_ENTITY_tablet.Instantiate(&metric_registry_, CURRENT_TEST_NAME());

    // TODO(mpercy): Similar to code in tablet_replica-test, consider refactor.
    RaftConfigPB config;
    config.set_opid_index(consensus::kInvalidOpIdIndex);
    RaftPeerPB* config_peer = config.add_peers();
    config_peer->set_permanent_uuid(fs_manager()->uuid());
    config_peer->mutable_last_known_addr()->set_host("0.0.0.0");
    config_peer->mutable_last_known_addr()->set_port(0);
    config_peer->set_member_type(RaftPeerPB::VOTER);

    scoped_refptr<ConsensusMetadataManager> cmeta_manager(
        new ConsensusMetadataManager(fs_manager()));
    ASSERT_OK(cmeta_manager->Create(tablet()->tablet_id(), config, kMinimumTerm));

    const auto& tablet_id = tablet()->tablet_id();
    tablet_replica_.reset(
        new TabletReplica(tablet()->metadata(),
                          cmeta_manager,
                          *config_peer,
                          apply_pool_.get(),
                          /*reload_txn_status_tablet_pool*/nullptr,
                          /*txn_coordinator_factory*/nullptr,
                          [this, tablet_id](const string& reason) {
                            this->TabletReplicaStateChangedCallback(tablet_id, reason);
                          }));
    ASSERT_OK(tablet_replica_->Init({ /*quiescing*/nullptr,
                                      /*num_leaders*/nullptr,
                                      raft_pool_.get() }));

    shared_ptr<Messenger> messenger;
    MessengerBuilder mbuilder(CURRENT_TEST_NAME());
    mbuilder.Build(&messenger);

    log_anchor_registry_.reset(new LogAnchorRegistry());
    tablet_replica_->SetBootstrapping();
    consensus::ConsensusBootstrapInfo boot_info;
    ASSERT_OK(tablet_replica_->Start(boot_info,
                                     tablet(),
                                     clock(),
                                     messenger,
                                     scoped_refptr<rpc::ResultTracker>(),
                                     log,
                                     prepare_pool_.get(),
                                     dns_resolver_.get()));
    ASSERT_OK(tablet_replica_->WaitUntilConsensusRunning(MonoDelta::FromSeconds(10)));
    ASSERT_OK(tablet_replica_->consensus()->WaitUntilLeader(MonoDelta::FromSeconds(10)));
  }

  void TabletReplicaStateChangedCallback(const string& tablet_id, const string& reason) {
    LOG(INFO) << "Tablet replica state changed for tablet " << tablet_id << ". Reason: " << reason;
  }

  void PopulateTablet() {
    for (int32_t i = 0; i < 1000; i++) {
      WriteRequestPB req;
      req.set_tablet_id(tablet_replica_->tablet_id());
      ASSERT_OK(SchemaToPB(client_schema_, req.mutable_schema()));
      RowOperationsPB* data = req.mutable_row_operations();
      RowOperationsPBEncoder enc(data);
      KuduPartialRow row(&client_schema_);

      string key = Substitute("key$0", i);
      ASSERT_OK(row.SetStringNoCopy(0, key));
      ASSERT_OK(row.SetInt32(1, i));
      enc.Add(RowOperationsPB::INSERT, row);

      WriteResponsePB resp;
      CountDownLatch latch(1);

      unique_ptr<tablet::WriteOpState> state(
          new tablet::WriteOpState(tablet_replica_.get(),
                                   &req,
                                   nullptr, // No RequestIdPB
                                   &resp));
      state->set_completion_callback(unique_ptr<tablet::OpCompletionCallback>(
          new tablet::LatchOpCompletionCallback<WriteResponsePB>(&latch, &resp)));
      ASSERT_OK(tablet_replica_->SubmitWrite(std::move(state)));
      latch.Wait();
      ASSERT_FALSE(resp.has_error())
          << "Request failed: " << pb_util::SecureShortDebugString(resp.error());
      ASSERT_EQ(0, resp.per_row_errors_size())
          << "Insert error: " << pb_util::SecureShortDebugString(resp);
    }
    ASSERT_OK(tablet()->Flush());
  }

  void InitSession() {
    if (GetParam() == TabletCopyMode::REMOTE) {
      session_.reset(new RemoteTabletCopySourceSession(
          tablet_replica_.get(), "TestSession", "FakeUUID",
          fs_manager(), nullptr /* no metrics */));
    } else {
      session_.reset(new LocalTabletCopySourceSession(
          tablet_replica_->tablet_id(), fs_manager(), nullptr /* no metrics */));
    }
    ASSERT_OK(session_->Init());
  }

  // Read the specified BlockId, via the TabletCopySourceSession, into a file.
  // 'path' will be populated with the name of the file used.
  // 'file' will be set to point to the SequentialFile containing the data.
  void FetchBlockToFile(const BlockId& block_id,
                        string* path,
                        unique_ptr<SequentialFile>* file) {
    string data;
    int64_t block_file_size = 0;
    TabletCopyErrorPB::Code error_code;
    CHECK_OK(session_->GetBlockPiece(block_id, 0, 0, &data, &block_file_size, &error_code));
    if (block_file_size > 0) {
      CHECK_GT(data.size(), 0);
    }

    // Write the file to a temporary location.
    WritableFileOptions opts;
    opts.is_sensitive = true;
    string path_template = GetTestPath(Substitute("test_block_$0$1.XXXXXX",
                                                  block_id.ToString(),
                                                  kTmpInfix));
    unique_ptr<WritableFile> writable_file;
    CHECK_OK(Env::Default()->NewTempWritableFile(opts, path_template, path, &writable_file));
    CHECK_OK(writable_file->Append(Slice(data.data(), data.size())));
    CHECK_OK(writable_file->Close());

    SequentialFileOptions seq_opts;
    seq_opts.is_sensitive = true;
    CHECK_OK(Env::Default()->NewSequentialFile(seq_opts, *path, file));
  }

  MetricRegistry metric_registry_;
  scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
  unique_ptr<ThreadPool> prepare_pool_;
  unique_ptr<ThreadPool> apply_pool_;
  unique_ptr<ThreadPool> raft_pool_;
  unique_ptr<DnsResolver> dns_resolver_;
  scoped_refptr<TabletReplica> tablet_replica_;
  scoped_refptr<TabletCopySourceSession> session_;
};

INSTANTIATE_TEST_SUITE_P(TabletCopyTestModes, TabletCopyTest,
                         testing::Values(TabletCopyMode::REMOTE,
                                         TabletCopyMode::LOCAL));

// Ensure that the serialized SuperBlock included in the TabletCopySourceSession is
// equal to the serialized live superblock (on a quiesced tablet).
TEST_P(TabletCopyTest, TestSuperBlocksEqual) {
  // Compare content of superblocks.
  faststring session_buf;
  faststring tablet_buf;

  {
    const TabletSuperBlockPB& session_superblock = session_->tablet_superblock();
    size_t size = session_superblock.ByteSizeLong();
    session_buf.resize(size);
    uint8_t* session_dst = session_buf.data();
    session_superblock.SerializeWithCachedSizesToArray(session_dst);
  }

  {
    TabletSuperBlockPB tablet_superblock;
    ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
    size_t size = tablet_superblock.ByteSizeLong();
    tablet_buf.resize(size);
    uint8_t* tablet_dst = tablet_buf.data();
    tablet_superblock.SerializeWithCachedSizesToArray(tablet_dst);
  }

  ASSERT_EQ(session_buf.size(), tablet_buf.size());
  int size = tablet_buf.size();
  ASSERT_EQ(0, strings::fastmemcmp_inlined(session_buf.data(), tablet_buf.data(), size));
}

// Test fetching all files from tablet server, ensure the checksums for each
// chunk and the total file sizes match.
TEST_P(TabletCopyTest, TestBlocksEqual) {
  TabletSuperBlockPB tablet_superblock;
  ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));
  for (int i = 0; i < tablet_superblock.rowsets_size(); i++) {
    const RowSetDataPB& rowset = tablet_superblock.rowsets(i);
    for (int j = 0; j < rowset.columns_size(); j++) {
      const ColumnDataPB& column = rowset.columns(j);
      const BlockIdPB& block_id_pb = column.block();
      BlockId block_id = BlockId::FromPB(block_id_pb);

      string path;
      unique_ptr<SequentialFile> file;
      FetchBlockToFile(block_id, &path, &file);
      uint64_t session_block_size = 0;
      ASSERT_OK(Env::Default()->GetFileSize(path, &session_block_size));
      faststring buf;
      buf.resize(session_block_size);
      Slice data(buf.data(), session_block_size);
      ASSERT_OK(file->Read(&data));
      uint32_t session_crc = crc::Crc32c(data.data(), data.size());
      LOG(INFO) << "session block file has size of " << session_block_size
                << " and CRC32C of " << session_crc << ": " << path;

      unique_ptr<ReadableBlock> tablet_block;
      ASSERT_OK(fs_manager()->OpenBlock(block_id, &tablet_block));
      uint64_t tablet_block_size = 0;
      ASSERT_OK(tablet_block->Size(&tablet_block_size));
      buf.resize(tablet_block_size);
      Slice data2(buf.data(), tablet_block_size);
      ASSERT_OK(tablet_block->Read(0, data2));
      uint32_t tablet_crc = crc::Crc32c(data2.data(), data2.size());
      LOG(INFO) << "tablet block file has size of " << tablet_block_size
                << " and CRC32C of " << tablet_crc
                << ": " << block_id;

      // Compare the blocks.
      ASSERT_EQ(tablet_block_size, session_block_size - Env::Default()->GetEncryptionHeaderSize());
      ASSERT_EQ(tablet_crc, session_crc);
    }
  }
}

// Ensure that blocks are still readable through the open session even
// after they've been deleted.
TEST_P(TabletCopyTest, TestBlocksAreFetchableAfterBeingDeleted) {
  TabletSuperBlockPB tablet_superblock;
  ASSERT_OK(tablet()->metadata()->ToSuperBlock(&tablet_superblock));

  // Gather all the blocks.
  vector<BlockId> data_blocks;
  for (const RowSetDataPB& rowset : tablet_superblock.rowsets()) {
    for (const DeltaDataPB& redo : rowset.redo_deltas()) {
      data_blocks.push_back(BlockId::FromPB(redo.block()));
    }
    for (const DeltaDataPB& undo : rowset.undo_deltas()) {
      data_blocks.push_back(BlockId::FromPB(undo.block()));
    }
    for (const ColumnDataPB& column : rowset.columns()) {
      data_blocks.push_back(BlockId::FromPB(column.block()));
    }
    if (rowset.has_bloom_block()) {
      data_blocks.push_back(BlockId::FromPB(rowset.bloom_block()));
    }
    if (rowset.has_adhoc_index_block()) {
      data_blocks.push_back(BlockId::FromPB(rowset.adhoc_index_block()));
    }
  }

  // Delete them.
  shared_ptr<BlockDeletionTransaction> deletion_transaction =
      fs_manager()->block_manager()->NewDeletionTransaction();
  for (const BlockId& block_id : data_blocks) {
    deletion_transaction->AddDeletedBlock(block_id);
  }
  vector<BlockId> deleted;
  ASSERT_OK(deletion_transaction->CommitDeletedBlocks(&deleted));
  ASSERT_EQ(data_blocks.size(), deleted.size());

  // Read them back.
  for (const BlockId& block_id : data_blocks) {
    ASSERT_TRUE(session_->IsBlockOpenForTests(block_id));
    string data;
    TabletCopyErrorPB::Code error_code;
    int64_t piece_size;
    ASSERT_OK(session_->GetBlockPiece(block_id, 0, 0,
                                      &data, &piece_size, &error_code));
  }
}

}  // namespace tserver
}  // namespace kudu
