// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/integration-tests/log_verifier.h"

#include <cstdint>
#include <iterator>
#include <map>
#include <memory>
#include <optional>
#include <ostream>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include <glog/logging.h>

#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/log.pb.h"
#include "kudu/consensus/log_index.h"
#include "kudu/consensus/log_reader.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/mini_cluster_fs_inspector.h"
#include "kudu/mini-cluster/mini_cluster.h"
#include "kudu/util/env.h"
#include "kudu/util/metrics.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"

using std::map;
using std::set;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;

namespace kudu {

using cluster::MiniCluster;
using consensus::OpId;
using itest::MiniClusterFsInspector;
using log::LogReader;

LogVerifier::LogVerifier(cluster::MiniCluster* cluster)
    : cluster_(cluster),
      env_(cluster->env()) {
  inspector_.reset(new MiniClusterFsInspector(cluster));
  CHECK(inspector_);
}

LogVerifier::~LogVerifier() {}

Status LogVerifier::ScanForCommittedOpIds(int ts_idx, const string& tablet_id,
                                          map<int64_t, int64_t>* index_to_term) {

  shared_ptr<LogReader> reader;
  const string wal_dir = JoinPathSegments(inspector_->WalDirForTS(ts_idx), tablet_id);
  RETURN_NOT_OK(LogReader::Open(cluster_->ts_env(ts_idx),
                                wal_dir,
                                /*index*/nullptr,
                                tablet_id,
                                /*metric_entity*/nullptr,
                                /*file_cache*/nullptr,
                                &reader));
  log::SegmentSequence segs;
  reader->GetSegmentsSnapshot(&segs);
  unique_ptr<log::LogEntryPB> entry;
  for (const auto& seg : segs) {
    log::LogEntryReader reader(seg.get());
    while (true) {
      Status s = reader.ReadNextEntry(&entry);
      if (s.IsEndOfFile() || s.IsCorruption()) break;
      RETURN_NOT_OK(s);
      if (entry->type() != log::COMMIT) continue;
      const auto& op_id = entry->commit().commited_op_id();

      if (!InsertIfNotPresent(index_to_term, op_id.index(), op_id.term())) {
        return Status::Corruption(Substitute(
            "Index $0 had two COMMIT messages: $1.$0 and $2.$0",
            op_id.index(), op_id.term(), (*index_to_term)[op_id.index()]));
      }
    }
  }

  return Status::OK();
}

Status LogVerifier::ScanForHighestCommittedOpIdInLog(int ts_idx,
                                                     const string& tablet_id,
                                                     OpId* commit_id) {
  const string& wal_dir = inspector_->WalDirForTS(ts_idx);
  map<int64_t, int64_t> index_to_term;

  RETURN_NOT_OK_PREPEND(ScanForCommittedOpIds(ts_idx, tablet_id, &index_to_term),
                        Substitute("Couldn't scan log in dir $0", wal_dir));
  if (index_to_term.empty()) {
    return Status::NotFound("no COMMITs in log");
  }
  commit_id->set_index(index_to_term.rbegin()->first);
  commit_id->set_term(index_to_term.rbegin()->second);
  return Status::OK();
}

Status LogVerifier::VerifyCommittedOpIdsMatch() {
  for (const string& tablet_id : inspector_->ListTablets()) {
    LOG(INFO) << "Checking tablet " << tablet_id;

    // Union set of the op indexes seen on any server.
    set<int64_t> all_op_indexes;
    // For each server in the cluster, a map of [index->term].
    vector<map<int64_t, int64_t>> maps_by_ts(cluster_->num_tablet_servers());

    // Gather the [index->term] map for each of the tablet servers
    // hosting this tablet.
    for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
      const string& wal_dir = JoinPathSegments(inspector_->WalDirForTS(i), tablet_id);
      if (!env_->FileExists(wal_dir)) continue;
      map<int64_t, int64_t> index_to_term;
      RETURN_NOT_OK_PREPEND(ScanForCommittedOpIds(i, tablet_id, &index_to_term),
                            Substitute("Couldn't scan log for TS $0", i));
      for (const auto& index_term : index_to_term) {
        all_op_indexes.insert(index_term.first);
      }
      maps_by_ts[i] = std::move(index_to_term);
    }

    // Check that the terms match up across servers.
    vector<int64_t> committed_terms;
    // Indicates that the op is not on this server.
    const int64_t kNotOnThisServer = -1;
    for (int64_t index : all_op_indexes) {
      committed_terms.clear();
      for (int ts = 0; ts < cluster_->num_tablet_servers(); ts++) {
        committed_terms.push_back(FindWithDefault(maps_by_ts[ts], index, kNotOnThisServer));
      }
      // 'committed_terms' entries should all be kNotOnThisServer or the same as each other.
      std::optional<int> expected_term;
      for (int ts = 0; ts < cluster_->num_tablet_servers(); ts++) {
        int64_t this_ts_term = committed_terms[ts];
        if (this_ts_term == kNotOnThisServer) continue; // this TS doesn't have the op
        if (!expected_term.has_value()) {
          expected_term = this_ts_term;
        } else if (this_ts_term != expected_term) {
          string err = Substitute("Mismatch found for index $0, [", index);
          for (int i = 0; i < cluster_->num_tablet_servers(); i++) {
            if (i != 0) err += ", ";
            strings::SubstituteAndAppend(&err, "T $0=$1",
                                         cluster_->UuidForTS(i),
                                         committed_terms[i]);
          }
          err += "]";
          return Status::Corruption(err);
        }
      }
    }

    LOG(INFO) << "Verified matching terms for " << all_op_indexes.size() << " ops in tablet "
              << tablet_id;
  }
  return Status::OK();
}

} // namespace kudu
