// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/tserver/tablet_server-test-base.h"

#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <iostream>
#include <memory>
#include <string>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/common/common.pb.h"
#include "kudu/common/iterator.h"
#include "kudu/common/partial_row.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/wire_protocol-test-util.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc_controller.h"
#include "kudu/tablet/local_tablet_writer.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/mini_tablet_server.h"
#include "kudu/tserver/tablet_server.h"
#include "kudu/tserver/tablet_server_options.h"
#include "kudu/tserver/tablet_server_test_util.h"
#include "kudu/tserver/ts_tablet_manager.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/pb_util.h"
#include "kudu/util/slice.h"
#include "kudu/util/test_graph.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds");
DECLARE_bool(enable_maintenance_manager);
DECLARE_int32(heartbeat_rpc_timeout_ms);

METRIC_DEFINE_entity(test);

using kudu::pb_util::SecureDebugString;
using kudu::pb_util::SecureShortDebugString;
using kudu::rpc::RpcController;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;

namespace kudu {
namespace tserver {

const char* TabletServerTestBase::kTableId = "TestTable";
const char* TabletServerTestBase::kTabletId = "ffffffffffffffffffffffffffffffff";

TabletServerTestBase::TabletServerTestBase()
    : schema_(GetSimpleTestSchema()),
      ts_test_metric_entity_(METRIC_ENTITY_test.Instantiate(
                                 &ts_test_metric_registry_, "ts_server-test")) {
  // Disable the maintenance ops manager since we want to trigger our own
  // maintenance operations at predetermined times.
  FLAGS_enable_maintenance_manager = false;

  // Decrease heartbeat timeout: we keep re-trying heartbeats when a
  // single master server fails due to a network error. Decreasing
  // the hearbeat timeout to 1 second speeds up unit tests which
  // purposefully specify non-running Master servers.
  FLAGS_heartbeat_rpc_timeout_ms = 1000;
}

// Starts the tablet server, override to start it later.
void TabletServerTestBase::SetUp() {
  KuduTest::SetUp();

  key_schema_ = schema_.CreateKeyProjection();
  rb_.reset(new RowBuilder(&schema_));

  rpc::MessengerBuilder bld("Client");
  ASSERT_OK(bld.Build(&client_messenger_));
}

void TabletServerTestBase::StartTabletServer(int num_data_dirs) {
  CHECK(!mini_server_);

  // Start server with an invalid master address, so it never successfully
  // heartbeats, even if there happens to be a master running on this machine.
  mini_server_.reset(new MiniTabletServer(GetTestPath("TabletServerTest-fsroot"),
                                          HostPort("127.0.0.1", 0), num_data_dirs));
  mini_server_->options()->master_addresses.clear();
  mini_server_->options()->master_addresses.emplace_back("255.255.255.255", 1);
  ASSERT_OK(mini_server_->Start());

  // Set up a tablet inside the server.
  ASSERT_OK(mini_server_->AddTestTablet(kTableId, kTabletId, schema_));
  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId,
                                                                     &tablet_replica_));

  // Creating a tablet is async, we wait here instead of having to handle errors later.
  ASSERT_OK(WaitForTabletRunning(kTabletId));

  // Connect to it.
  ResetClientProxies();
}

Status TabletServerTestBase::WaitForTabletRunning(const char *tablet_id) {
  scoped_refptr<tablet::TabletReplica> tablet_replica;
  const auto* tablet_manager = mini_server_->server()->tablet_manager();
  const auto kTimeout = MonoDelta::FromSeconds(10);
  RETURN_NOT_OK(tablet_manager->GetTabletReplica(tablet_id, &tablet_replica));
  RETURN_NOT_OK(tablet_replica->WaitUntilConsensusRunning(kTimeout));
  RETURN_NOT_OK(
      tablet_replica->consensus()->WaitUntilLeaderForTests(kTimeout));

  // KUDU-2463: Even though the tablet thinks its leader, for correctness, it
  // must wait to finish replicating its no-op (even as a single replica)
  // before being available to scans.
  MonoTime deadline = MonoTime::Now() + kTimeout;
  while (!tablet_replica->tablet()->mvcc_manager()->CheckIsCleanTimeInitialized().ok()) {
    if (MonoTime::Now() >= deadline) {
      return Status::TimedOut("mvcc did not advance safe time within timeout");
    }
    SleepFor(MonoDelta::FromMilliseconds(10));
  }

  // KUDU-2444: Even though the tablet replica is fully running, the tablet
  // manager may regard it as still transitioning to the running state.
  return tablet_manager->WaitForNoTransitionsForTests(MonoDelta::FromSeconds(10));
}

void TabletServerTestBase::UpdateTestRowRemote(int32_t row_idx,
                                               int32_t new_val,
                                               TimeSeries* ts) {
  WriteRequestPB req;
  req.set_tablet_id(kTabletId);
  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));

  WriteResponsePB resp;
  RpcController controller;
  controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
  string new_string_val(Substitute("mutated$0", row_idx));

  AddTestRowToPB(RowOperationsPB::UPDATE, schema_, row_idx, new_val, new_string_val,
                 req.mutable_row_operations());
  ASSERT_OK(proxy_->Write(req, &resp, &controller));

  SCOPED_TRACE(SecureDebugString(resp));
  ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp);
  ASSERT_EQ(0, resp.per_row_errors_size());
  if (ts) {
    ts->AddValue(1);
  }
}

void TabletServerTestBase::ResetClientProxies() {
  CreateTsClientProxies(mini_server_->bound_rpc_addr(),
                        client_messenger_,
                        &tablet_copy_proxy_,
                        &proxy_,
                        &admin_proxy_,
                        &consensus_proxy_,
                        &generic_proxy_);
}

// Inserts 'num_rows' test rows directly into the tablet (i.e not via RPC)
void TabletServerTestBase::InsertTestRowsDirect(int32_t start_row,
                                                int32_t num_rows,
                                                const string& tablet_id) {
  scoped_refptr<tablet::TabletReplica> tablet_replica;
  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(tablet_id,
                                                                     &tablet_replica));
  tablet::LocalTabletWriter writer(tablet_replica->tablet(), &schema_);
  KuduPartialRow row(&schema_);
  for (int32_t i = 0; i < num_rows; i++) {
    BuildTestRow(start_row + i, &row);
    CHECK_OK(writer.Insert(row));
  }
}

// Inserts 'num_rows' test rows remotely into the tablet (i.e via RPC)
// Rows are grouped in batches of 'count'/'num_batches' size.
// Batch size defaults to 1.
void TabletServerTestBase::InsertTestRowsRemote(
    int32_t first_row,
    int32_t count,
    int32_t num_batches,
    TabletServerServiceProxy* proxy,
    string tablet_id,
    vector<uint64_t>* write_timestamps_collector,
    TimeSeries* ts,
    bool string_field_defined) {
  if (!proxy) {
    proxy = proxy_.get();
  }

  if (num_batches == -1) {
    num_batches = count;
  }

  WriteRequestPB req;
  req.set_tablet_id(std::move(tablet_id));

  WriteResponsePB resp;
  RpcController controller;

  RowOperationsPB* data = req.mutable_row_operations();

  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));

  int32_t inserted_since_last_report = 0;
  for (int i = 0; i < num_batches; ++i) {

    // reset the controller and the request
    controller.Reset();
    controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
    data->Clear();

    int32_t first_row_in_batch = first_row + (i * count / num_batches);
    int32_t last_row_in_batch = first_row_in_batch + count / num_batches;

    for (int32_t j = first_row_in_batch; j < last_row_in_batch; j++) {
      string str_val = Substitute("original$0", j);
      const char* cstr_val = str_val.c_str();
      if (!string_field_defined) {
        cstr_val = NULL;
      }
      AddTestRowWithNullableStringToPB(RowOperationsPB::INSERT, schema_, j, j,
                                       cstr_val, data);
    }
    CHECK_OK(DCHECK_NOTNULL(proxy)->Write(req, &resp, &controller));
    if (write_timestamps_collector) {
      write_timestamps_collector->push_back(resp.timestamp());
    }

    if (resp.has_error() || resp.per_row_errors_size() > 0) {
      LOG(FATAL) << "Failed to insert batch "
                 << first_row_in_batch << "-" << last_row_in_batch
                 << ": " << SecureDebugString(resp);
    }

    inserted_since_last_report += count / num_batches;
    if ((inserted_since_last_report > 100) && ts) {
      ts->AddValue(static_cast<double>(inserted_since_last_report));
      inserted_since_last_report = 0;
    }
  }

  if (ts) {
    ts->AddValue(static_cast<double>(inserted_since_last_report));
  }
}

// Delete specified test row range.
void TabletServerTestBase::DeleteTestRowsRemote(int32_t first_row,
                                                int32_t count,
                                                TabletServerServiceProxy* proxy,
                                                string tablet_id) {
  if (!proxy) {
    proxy = proxy_.get();
  }

  WriteRequestPB req;
  WriteResponsePB resp;
  RpcController controller;

  req.set_tablet_id(std::move(tablet_id));
  ASSERT_OK(SchemaToPB(schema_, req.mutable_schema()));

  RowOperationsPB* ops = req.mutable_row_operations();
  for (int32_t rowid = first_row; rowid < first_row + count; rowid++) {
    AddTestKeyToPB(RowOperationsPB::DELETE, schema_, rowid, ops);
  }

  SCOPED_TRACE(SecureDebugString(req));
  ASSERT_OK(DCHECK_NOTNULL(proxy)->Write(req, &resp, &controller));
  SCOPED_TRACE(SecureDebugString(resp));
  ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp);
}

void TabletServerTestBase::BuildTestRow(int index, KuduPartialRow* row) {
  ASSERT_OK(row->SetInt32(0, index));
  ASSERT_OK(row->SetInt32(1, index * 2));
  ASSERT_OK(row->SetStringCopy(2, StringPrintf("hello %d", index)));
}

void TabletServerTestBase::DrainScannerToStrings(const string& scanner_id,
                                                 const Schema& projection,
                                                 vector<string>* results,
                                                 TabletServerServiceProxy* proxy,
                                                 uint32_t call_seq_id) {
  if (!proxy) {
    proxy = proxy_.get();
  }

  RpcController rpc;
  rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout));
  ScanRequestPB req;
  ScanResponsePB resp;
  req.set_scanner_id(scanner_id);

  // NOTE: we do not sort the results here, since this function is used
  // by test cases which are verifying the server side's ability to
  // do ordered scans.
  do {
    rpc.Reset();
    req.set_batch_size_bytes(10000);
    req.set_call_seq_id(call_seq_id);
    SCOPED_TRACE(SecureDebugString(req));
    ASSERT_OK(DCHECK_NOTNULL(proxy)->Scan(req, &resp, &rpc));
    SCOPED_TRACE(SecureDebugString(resp));
    ASSERT_FALSE(resp.has_error());

    StringifyRowsFromResponse(projection, rpc, &resp, results);
    call_seq_id += 1;
  } while (resp.has_more_results());
}

void TabletServerTestBase::StringifyRowsFromResponse(
    const Schema& projection,
    const RpcController& rpc,
    ScanResponsePB* resp,
    vector<string>* results) {
  RowwiseRowBlockPB* rrpb = resp->mutable_data();
  Slice direct, indirect; // sidecar data buffers
  ASSERT_OK(rpc.GetInboundSidecar(rrpb->rows_sidecar(), &direct));
  if (rrpb->has_indirect_data_sidecar()) {
    ASSERT_OK(rpc.GetInboundSidecar(rrpb->indirect_data_sidecar(),
            &indirect));
  }
  vector<const uint8_t*> rows;
  ASSERT_OK(ExtractRowsFromRowBlockPB(projection, *rrpb,
                                      indirect, &direct, &rows));
  VLOG(1) << "Round trip got " << rows.size() << " rows";
  for (const uint8_t* row_ptr : rows) {
    ConstContiguousRow row(&projection, row_ptr);
    results->push_back(projection.DebugRow(row));
  }
}

void TabletServerTestBase::ShutdownTablet() {
  if (mini_server_.get()) {
    // The TabletReplica must be destroyed before the TS, otherwise data
    // blocks may be destroyed after their owning block manager.
    tablet_replica_.reset();
    mini_server_->Shutdown();
    mini_server_.reset();
  }
}

Status TabletServerTestBase::ShutdownAndRebuildTablet(int num_data_dirs) {
  ShutdownTablet();

  // Start server.
  mini_server_.reset(new MiniTabletServer(GetTestPath("TabletServerTest-fsroot"),
                                          HostPort("127.0.0.1", 0), num_data_dirs));
  mini_server_->options()->master_addresses.clear();
  mini_server_->options()->master_addresses.emplace_back("255.255.255.255", 1);
  // this should open the tablet created on StartTabletServer()
  RETURN_NOT_OK(mini_server_->Start());

  // Don't RETURN_NOT_OK immediately -- even if we fail, we may still get a TabletReplica object
  // which has information about the failure.
  Status wait_status = mini_server_->WaitStarted();
  bool found_peer = mini_server_->server()->tablet_manager()->LookupTablet(
      kTabletId, &tablet_replica_);
  RETURN_NOT_OK(wait_status);
  if (!found_peer) {
    return Status::NotFound("Tablet was not found");
  }

  // Connect to it.
  ResetClientProxies();

  // Opening a tablet is async, we wait here instead of having to handle errors later.
  return WaitForTabletRunning(kTabletId);
}

// Verifies that a set of expected rows (key, value) is present in the tablet.
void TabletServerTestBase::VerifyRows(const Schema& schema,
                                      const vector<KeyValue>& expected) {
  unique_ptr<RowwiseIterator> iter;
  ASSERT_OK(tablet_replica_->tablet()->NewRowIterator(schema, &iter));
  ASSERT_OK(iter->Init(nullptr));

  int batch_size = std::max<int>(1,
     std::min<int>(expected.size() / 10,
                   4*1024*1024 / schema.byte_size()));

  RowBlockMemory mem(32 * 1024);
  RowBlock block(&schema, batch_size, &mem);

  int count = 0;
  while (iter->HasNext()) {
    mem.Reset();
    ASSERT_OK_FAST(iter->NextBlock(&block));
    RowBlockRow rb_row = block.row(0);
    for (int i = 0; i < block.nrows(); i++) {
      if (block.selection_vector()->IsRowSelected(i)) {
        rb_row.Reset(&block, i);
        VLOG(1) << "Verified row " << schema.DebugRow(rb_row);
        ASSERT_LT(count, expected.size()) << "Got more rows than expected!";
        EXPECT_EQ(expected[count].first, *schema.ExtractColumnFromRow<INT32>(rb_row, 0))
            << "Key mismatch at row: " << count;
        EXPECT_EQ(expected[count].second, *schema.ExtractColumnFromRow<INT32>(rb_row, 1))
            << "Value mismatch at row: " << count;
        count++;
      }
    }
  }
  ASSERT_EQ(count, expected.size());
}

void TabletServerTestBase::VerifyScanRequestFailure(const ScanRequestPB& req,
                                                    TabletServerErrorPB::Code expected_code,
                                                    const char *expected_message) {
  ScanResponsePB resp;
  RpcController rpc;

  // Send the call.
  {
    SCOPED_TRACE(SecureDebugString(req));
    ASSERT_OK(proxy_->Scan(req, &resp, &rpc));
    SCOPED_TRACE(SecureDebugString(resp));
    ASSERT_TRUE(resp.has_error());
    ASSERT_EQ(expected_code, resp.error().code());
    ASSERT_STR_CONTAINS(resp.error().status().message(), expected_message);
  }
}

void TabletServerTestBase::VerifyScanRequestFailure(const Schema& projection,
                                                    TabletServerErrorPB::Code expected_code,
                                                    const char *expected_message) {
  ScanRequestPB req;
  NewScanRequestPB* scan = req.mutable_new_scan_request();
  scan->set_tablet_id(kTabletId);
  ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns()));
  req.set_call_seq_id(0);
  NO_FATALS(VerifyScanRequestFailure(req, expected_code, expected_message));
}

Status TabletServerTestBase::FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const {
  scan->set_tablet_id(kTabletId);
  scan->set_read_mode(read_mode);
  return SchemaToColumnPBs(schema_, scan->mutable_projected_columns());
}

// Open a new scanner which scans all of the columns in the table.
void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp,
                                                     ReadMode read_mode) {
  ScanRequestPB req;
  RpcController rpc;

  // Set up a new request with no predicates, all columns.
  ASSERT_OK(FillNewScanRequest(read_mode, req.mutable_new_scan_request()));
  req.set_call_seq_id(0);
  req.set_batch_size_bytes(0); // so it won't return data right away

  // Send the call
  {
    SCOPED_TRACE(SecureDebugString(req));
    ASSERT_OK(proxy_->Scan(req, resp, &rpc));
    SCOPED_TRACE(SecureDebugString(*resp));
    ASSERT_FALSE(resp->has_error());
    ASSERT_TRUE(resp->has_more_results());
  }
}

} // namespace tserver
} // namespace kudu
