// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/common/wire_protocol.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
#include "kudu/integration-tests/mini_cluster_fs_inspector.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/tablet/metadata.pb.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/scoped_cleanup.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

DEFINE_int32(test_num_iterations, 5,
             "Number of tombstoned voting stress test iterations");

using kudu::consensus::COMMITTED_OPID;
using kudu::consensus::OpId;
using kudu::itest::DeleteTablet;
using kudu::itest::TServerDetails;
using kudu::itest::WaitForServersToAgree;
using kudu::tablet::TABLET_DATA_TOMBSTONED;
using std::atomic;
using std::string;
using std::thread;
using std::unique_lock;
using std::vector;

namespace kudu {

static const MonoDelta kTimeout = MonoDelta::FromSeconds(30);

class TombstonedVotingStressTest : public ExternalMiniClusterITestBase {
 public:
  TombstonedVotingStressTest()
      : num_workers_(1),
        cond_all_workers_blocked_(&lock_),
        cond_workers_unblocked_(&lock_),
        current_term_(1) {
  }

 protected:
  enum State {
    kRunning,       // The tablet is running normally.
    kTombstoning,   // We are tombstoning the tablet.
    kTombstoned,    // The tombstoning is complete.
    kCopying,       // We are copying the tablet.
    kTestComplete,  // The test is complete and about to exit.
  };

  string State_Name(State state);

  // 1. Check if workers should block, block if required.
  // 2. Return current state.
  State GetState();

  // 1. Block worker threads.
  // 2. Wait for all workers to be blocked.
  // 3. Change state.
  // 4. Unblock workers.
  void SetState(State state);

  // Thread that loops and requests votes from TS1.
  void RunVoteRequestLoop();

  // Set-once shared state.
  string tablet_id_;
  OpId last_logged_opid_;

  Mutex lock_;
  const int num_workers_;
  int num_workers_blocked_ = 0;
  bool block_workers_ = false;
  ConditionVariable cond_all_workers_blocked_;  // Triggers once all worker threads are blocked.
  ConditionVariable cond_workers_unblocked_;    // Triggers when the workers become unblocked.

  // Protected by lock_.
  State state_ = kRunning;

  // State for the voter thread.
  atomic<int64_t> current_term_;
};

string TombstonedVotingStressTest::State_Name(State state) {
  switch (state) {
    case kRunning:
      return "kRunning";
    case kTombstoning:
      return "kTombstoning";
    case kTombstoned:
      return "kTombstoned";
    case kCopying:
      return "kCopying";
    case kTestComplete:
      return "kTestComplete";
    default:
      LOG(FATAL) << "Unknown state: " << state;
      __builtin_unreachable();
  }
}

TombstonedVotingStressTest::State TombstonedVotingStressTest::GetState() {
  unique_lock<Mutex> l(lock_);
  bool blocked = false;
  if (block_workers_) {
    num_workers_blocked_++;
    blocked = true;
    if (num_workers_blocked_ == num_workers_) {
      cond_all_workers_blocked_.Signal();
    }
  }
  while (block_workers_) {
    cond_workers_unblocked_.Wait();
  }
  if (blocked) num_workers_blocked_--;
  return state_;
}

void TombstonedVotingStressTest::SetState(State state) {
  // 1. Block worker threads.
  // 2. Wait for all workers to be blocked.
  // 3. Change state.
  // 4. Unblock workers.
  LOG(INFO) << "setting state to " << State_Name(state);
  unique_lock<Mutex> l(lock_);
  block_workers_ = true;
  while (num_workers_blocked_ != num_workers_) {
    cond_all_workers_blocked_.Wait();
  }
  state_ = state;
  block_workers_ = false;
  cond_workers_unblocked_.Broadcast();
}

void TombstonedVotingStressTest::RunVoteRequestLoop() {
  TServerDetails* ts1_ets = ts_map_[cluster_->tablet_server(1)->uuid()];
  while (true) {
    State state = GetState();
    if (state == kTestComplete) break;
    ++current_term_;
    Status s = itest::RequestVote(ts1_ets, tablet_id_, "A", current_term_, last_logged_opid_,
                                  /*ignore_live_leader=*/ true, /*is_pre_election=*/ false,
                                  kTimeout);
    switch (state) {
      case kRunning: [[fallthrough]];
      case kTombstoned:
        // We should always be able to vote in this case.
        if (s.ok()) {
          LOG(INFO) << "Vote OK: state = " << state;
        } else {
          LOG(FATAL) << s.ToString() << ": tablet = " << tablet_id_ << ": state = " << state;
        }
        break;

      // The vote can fail while in the process of tombstoning a replica
      // because there is a small window of time where we have stopped
      // RaftConsensus but we haven't yet recorded the last-logged opid in the
      // tablet metadata.
      case kTombstoning: [[fallthrough]];
      case kCopying:
        if (s.ok()) {
          LOG(INFO) << "Vote OK: state = " << state;
        } else {
          LOG(WARNING) << "Got bad vote while copying or tombstoning: " << s.ToString()
                       << ": state = " << state;
        }
        break;

      default:
        // We're shutting down.
        continue;
    }
    SleepFor(MonoDelta::FromMilliseconds(1)); // Don't run too hot.
  }
}

// Stress test for tombstoned voting, including tombstoning, deleting, and
// copying replicas.
TEST_F(TombstonedVotingStressTest, TestTombstonedVotingUnderStress) {
  // This test waits for several seconds, so only run it in slow mode.
  SKIP_IF_SLOW_NOT_ALLOWED();

  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);

  // We want to control leader election manually and we only want 2 replicas.
  NO_FATALS(StartCluster({ "--enable_leader_failure_detection=false" },
                         { "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
                           "--allow_unsafe_replication_factor=true" },
                         /*num_tablet_servers=*/ 2));
  TestWorkload workload(cluster_.get());
  workload.set_num_replicas(2); // Two servers and replicas makes the test easy to debug.
  workload.Setup();
  ASSERT_OK(inspect_->WaitForReplicaCount(2));

  // Figure out the tablet id.
  vector<string> tablets = inspect_->ListTabletsOnTS(1);
  ASSERT_EQ(1, tablets.size());
  tablet_id_ = tablets[0];

  for (int i = 1; i < cluster_->num_tablet_servers(); i++) {
    ASSERT_OK(itest::WaitUntilTabletRunning(ts_map_[cluster_->tablet_server(i)->uuid()],
                                            tablet_id_, kTimeout));
    LOG(INFO) << "TabletReplica is RUNNING: T " << tablet_id_
              << " P " << cluster_->tablet_server(i)->uuid();
  }

  // Elect a leader and run some data through the cluster.
  LOG(INFO) << "electing a leader...";
  TServerDetails* ts0_ets = ts_map_[cluster_->tablet_server(0)->uuid()];
  TServerDetails* ts1_ets = ts_map_[cluster_->tablet_server(1)->uuid()];
  ASSERT_EVENTUALLY([&] {
    // The tablet can report that it's running but still be bootstrapping, so
    // retry until the election starts.
    ASSERT_OK(itest::StartElection(ts0_ets, tablet_id_, kTimeout));
  });

  LOG(INFO) << "loading data...";
  workload.Start();
  while (workload.rows_inserted() < 100) {
    SleepFor(MonoDelta::FromMilliseconds(10));
  }
  workload.StopAndJoin();
  ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id_, workload.batches_completed()));
  ASSERT_OK(itest::GetLastOpIdForReplica(tablet_id_, ts0_ets, COMMITTED_OPID, kTimeout,
                                         &last_logged_opid_));

  // Have the leader step down so we can test voting on the other replica.
  // We don't shut this node down because it will serve as the tablet copy
  // "source" during the test.
  LOG(INFO) << "forcing leader to step down...";
  // The leader role cannot fluctuate in this scenario because of
  // --enable_leader_failure_detection=false setting.
  ASSERT_OK(itest::LeaderStepDown(ts0_ets, tablet_id_, kTimeout));

  // Now we are done with setup. Start the "stress" part of the test.
  // Startup the voting thread.
  LOG(INFO) << "starting stress thread...";
  thread voter_thread([this] { RunVoteRequestLoop(); });
  SCOPED_CLEANUP({
    SetState(kTestComplete);
    voter_thread.join();
  });

  int iter = 0;
  while (iter++ < FLAGS_test_num_iterations) {
    LOG(INFO) << "iteration " << (iter + 1) << " of " << FLAGS_test_num_iterations;
    // Loop on voting for a while in running state. We want to give an
    // opportunity for many votes during this time, and since voting involves
    // fsyncing to disk, we wait for plenty of time here (and below).
    SleepFor(MonoDelta::FromMilliseconds(500));

    // 1. Tombstone tablet.
    LOG(INFO) << "tombstoning tablet...";
    SetState(kTombstoning);
    ASSERT_OK(DeleteTablet(ts1_ets, tablet_id_, TABLET_DATA_TOMBSTONED, kTimeout));
    SetState(kTombstoned);

    // Loop on voting for a while in tombstoned state.
    SleepFor(MonoDelta::FromMilliseconds(500));

    // 2. Copy tablet.
    LOG(INFO) << "copying tablet...";
    HostPort source_hp = HostPortFromPB(ts0_ets->registration.rpc_addresses(0));
    SetState(kCopying);
    ASSERT_OK(itest::StartTabletCopy(ts1_ets, tablet_id_, ts0_ets->uuid(), source_hp, current_term_,
                                     kTimeout));
    LOG(INFO) << "waiting for servers to agree...";
    ASSERT_OK(WaitForServersToAgree(kTimeout, ts_map_, tablet_id_, workload.batches_completed()));

    SetState(kRunning);
  }
}

} // namespace kudu
