// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <fcntl.h>
#include <unistd.h>

#include <algorithm>
#include <cstdint>
#include <cstring>
#include <ostream>
#include <random>
#include <string>
#include <thread>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>

#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/walltime.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/hdr_histogram.h"
#include "kudu/util/logging.h"
#include "kudu/util/monotime.h"
#include "kudu/util/path_util.h"
#include "kudu/util/stopwatch.h"

DEFINE_int32(num_files, 40, "number of files to write");
DEFINE_int32(file_size_mb, 16, "size of each file");
DEFINE_int32(num_rounds, 1, "number of times to try each setup");
DEFINE_int32(wal_interval_us, 1000, "number of microseconds to sleep between WAL writes");
DEFINE_string(file_path, "", "path where all files are written; defaults to cwd");

DEFINE_bool(initiate_writeback_each_file, false,
            "SYNC_FILE_RANGE_WRITE each file before closing it");
DEFINE_bool(await_writeback_each_file, false,
            "SYNC_FILE_RANGE_WAIT_BEFORE each file before closing it");
DEFINE_bool(fdatasync_each_file, false,
            "fdatasync() each file before closing it");

DEFINE_bool(initiate_writeback_at_end, false,
            "SYNC_FILE_RANGE_WRITE each file after writing all files");
DEFINE_bool(await_writeback_at_end, false,
            "SYNC_FILE_RANGE_WAIT_BEFORE each file after writing all files");
DEFINE_bool(fdatasync_at_end, true,
            "fdatasync each file after writing all files");

DEFINE_bool(page_align_wal_writes, false,
            "write to the fake WAL with exactly 4KB writes to never cross pages");

using std::string;
using std::thread;
using std::vector;

namespace kudu {

class WalHiccupBenchmarker {
 public:
  WalHiccupBenchmarker()
    : finished_(1),
      cur_histo_(NULL) {
  }
  ~WalHiccupBenchmarker() {
    STLDeleteElements(&wal_histos_);
  }

  void WALThread();
  void PrintConfig();
  void RunOnce();
  void Run();
 protected:
  CountDownLatch finished_;
  std::vector<HdrHistogram*> wal_histos_;
  HdrHistogram* cur_histo_;
};


void WalHiccupBenchmarker::WALThread() {
  string name = "wal";
  if (!FLAGS_file_path.empty()) {
    name = JoinPathSegments(FLAGS_file_path, name);
  }
  int fd = open(name.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 0666);
  PCHECK(fd >= 0) << "open() failed";
  char buf[4096];
  memset(buf, 0xFF, sizeof(buf));
  const MonoDelta sleepDelta = MonoDelta::FromMicroseconds(FLAGS_wal_interval_us);
  while (finished_.count() > 0) {
    SleepFor(sleepDelta);
    MicrosecondsInt64 st = GetCurrentTimeMicros();
    size_t num_bytes = FLAGS_page_align_wal_writes ? sizeof(buf) : sizeof(buf) - 1;
    PCHECK(write(fd, buf, num_bytes) == num_bytes);
    PCHECK(fdatasync(fd) == 0);
    MicrosecondsInt64 et = GetCurrentTimeMicros();
    MicrosecondsInt64 value = et - st;
    cur_histo_->IncrementWithExpectedInterval(value, FLAGS_wal_interval_us);
    if (value > FLAGS_wal_interval_us) {
      LOG(WARNING) << "slow wal write: " <<  value << "us";
    }
  }
}

void WriteFile(const string& name,
               bool initiate_writeback,
               bool wait_writeback,
               bool datasync,
               int* fd) {
  string full_name = name;
  if (!FLAGS_file_path.empty()) {
    full_name = JoinPathSegments(FLAGS_file_path, full_name);
  }
  *fd = open(full_name.c_str(), O_WRONLY|O_TRUNC|O_CREAT, 0666);
  PCHECK(*fd >= 0) << "open() failed";

  char buf[1024];
  memset(buf, 0xFF, sizeof(buf));
  for (int i = 0; i < FLAGS_file_size_mb; i++) {
    for (int j = 0; j < 1024; j++) {
      PCHECK(write(*fd, buf, sizeof(buf)) > 0);
    }
  }

  if (initiate_writeback) {
    PCHECK(sync_file_range(*fd, 0, 0, SYNC_FILE_RANGE_WRITE) == 0);
  }

  if (wait_writeback) {
    PCHECK(sync_file_range(*fd, 0, 0, SYNC_FILE_RANGE_WAIT_BEFORE) == 0);
  }

  if (datasync) {
    PCHECK(fdatasync(*fd) == 0);
  }
}

void SetFlags(uint32_t setup) {
  FLAGS_initiate_writeback_each_file = setup & (1 << 0);
  FLAGS_await_writeback_each_file = setup & (1 << 1);
  FLAGS_fdatasync_each_file = setup & (1 << 2);

  FLAGS_initiate_writeback_at_end = setup & (1 << 3);
  FLAGS_await_writeback_at_end = setup & (1 << 4);
  FLAGS_fdatasync_at_end = setup & (1 << 5);

  FLAGS_page_align_wal_writes = setup & (1 << 6);
}

void WalHiccupBenchmarker::Run() {
  int num_setups = 1 << 7;
  wal_histos_.resize(num_setups);

  vector<double> total_time;
  total_time.resize(num_setups);

  vector<uint32_t> setups;
  setups.reserve(num_setups);
  for (uint32_t setup = 0; setup < num_setups; setup++) {
    setups.push_back(setup);
  }

  std::random_device rdev;
  std::mt19937 gen(rdev());
  for (int round = 0; round < FLAGS_num_rounds; round++) {
    // Randomize the order of setups in each round.
    std::shuffle(setups.begin(), setups.end(), gen);

    for (uint32_t setup : setups) {
      SetFlags(setup);
      if (!FLAGS_fdatasync_each_file && !FLAGS_fdatasync_at_end) {
        // Skip non-durable configuration
        continue;
      }

      LOG(INFO) << "----------------------------------------------------------------------";
      LOG(INFO) << "Continuing setup " << setup << ":";
      PrintConfig();
      LOG(INFO) << "----------------------------------------------------------------------";

      sync();
      if (wal_histos_[setup] == NULL) {
        wal_histos_[setup] = new HdrHistogram(1000000, 4);
      }
      cur_histo_ = wal_histos_[setup];

      Stopwatch s;
      s.start();
      RunOnce();
      s.stop();
      total_time[setup] += s.elapsed().wall_seconds();
    }
    LOG(INFO) << "----------------------------------------------------------------------";
    LOG(INFO) << "Ran " << setups.size() << " setups";
    LOG(INFO) << "----------------------------------------------------------------------";
  }

  for (uint32_t setup : setups) {
    SetFlags(setup);
    if (!FLAGS_fdatasync_each_file && !FLAGS_fdatasync_at_end) {
      // Skip non-durable configuration
      continue;
    }

    cur_histo_ = wal_histos_[setup];

    double throughput = (FLAGS_num_rounds * FLAGS_num_files * FLAGS_file_size_mb) /
      total_time[setup];

    LOG(INFO) << "----------------------------------------------------------------------";
    LOG(INFO) << "Test results for setup " << setup << ":";
    PrintConfig();
    LOG(INFO);
    LOG(INFO) << "throughput: " << throughput;
    LOG(INFO) << "p95: " << cur_histo_->ValueAtPercentile(95.0);
    LOG(INFO) << "p99: " << cur_histo_->ValueAtPercentile(99.0);
    LOG(INFO) << "p99.99: " << cur_histo_->ValueAtPercentile(99.99);
    LOG(INFO) << "max: " << cur_histo_->MaxValue();
    LOG(INFO) << "----------------------------------------------------------------------";
  }

}

void WalHiccupBenchmarker::PrintConfig() {
  LOG(INFO) << "initiate_writeback_each_file: " << FLAGS_initiate_writeback_each_file;
  LOG(INFO) << "await_writeback_each_file: " << FLAGS_await_writeback_each_file;
  LOG(INFO) << "fdatasync_each_file: " << FLAGS_fdatasync_each_file;
  LOG(INFO) << "initiate_writeback_at_end: " << FLAGS_initiate_writeback_at_end;
  LOG(INFO) << "await_writeback_at_end: " << FLAGS_await_writeback_at_end;
  LOG(INFO) << "fdatasync_at_end: " << FLAGS_fdatasync_at_end;
  LOG(INFO) << "page_align_wal_writes: " << FLAGS_page_align_wal_writes;
}

void WalHiccupBenchmarker::RunOnce() {
  finished_.Reset(1);
  thread thr([this]() { this->WALThread(); });
  int fds[FLAGS_num_files];
  for (int i = 0; i < FLAGS_num_files; i++) {
    WriteFile(strings::Substitute("file-$0", i),
              FLAGS_initiate_writeback_each_file,
              FLAGS_await_writeback_each_file,
              FLAGS_fdatasync_each_file,
              &fds[i]);
  }

  LOG(INFO) << "Done writing...";
  if (FLAGS_initiate_writeback_at_end) {
    LOG(INFO) << "Post-write initiating writeback...";
    for (int i = 0; i < FLAGS_num_files; i++) {
      PCHECK(sync_file_range(fds[i], 0, 0, SYNC_FILE_RANGE_WRITE) == 0);
    }
  }
  if (FLAGS_await_writeback_at_end) {
    LOG(INFO) << "Post-write awaiting writeback...";
    for (int i = 0; i < FLAGS_num_files; i++) {
      PCHECK(sync_file_range(fds[i], 0, 0, SYNC_FILE_RANGE_WAIT_BEFORE) == 0);
    }
  }
  if (FLAGS_fdatasync_at_end) {
    LOG(INFO) << "Post-write fdatasync...";
    for (int i = 0; i < FLAGS_num_files; i++) {
      PCHECK(fdatasync(fds[i]) == 0);
    }
  }
  for (int i = 0; i < FLAGS_num_files; i++) {
    PCHECK(close(fds[i]) == 0);
  }

  LOG(INFO) << "Done closing...";
  finished_.CountDown();
  thr.join();
}

} // namespace kudu

int main(int argc, char* argv[]) {
  google::ParseCommandLineFlags(&argc, &argv, true);
  kudu::InitGoogleLoggingSafe(argv[0]);

  kudu::WalHiccupBenchmarker benchmarker;
  benchmarker.Run();

  return 0;
}
