// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <unistd.h>

#include <algorithm>
#include <cstdint>
#include <memory>
#include <ostream>
#include <string>
#include <utility>
#include <vector>

#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/client/schema.h"
#include "kudu/gutil/integral_types.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/integration-tests/cluster_itest_util.h"
#include "kudu/integration-tests/external_mini_cluster-itest-base.h"
#include "kudu/integration-tests/test_workload.h"
#include "kudu/mini-cluster/external_mini_cluster.h"
#include "kudu/util/env.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"

METRIC_DECLARE_entity(server);
METRIC_DECLARE_gauge_uint64(block_manager_total_blocks_created);
METRIC_DECLARE_gauge_uint64(block_manager_total_bytes_written);
METRIC_DECLARE_gauge_uint64(block_manager_total_disk_sync);
METRIC_DECLARE_gauge_uint64(log_block_manager_blocks_under_management);
METRIC_DECLARE_gauge_uint64(log_block_manager_bytes_under_management);
METRIC_DECLARE_gauge_uint64(log_block_manager_containers);
METRIC_DECLARE_gauge_uint64(log_block_manager_full_containers);
METRIC_DECLARE_gauge_uint64(threads_running);

DECLARE_bool(enable_rowset_compaction);
DECLARE_string(block_manager);

DEFINE_bool(measure_startup_drop_caches, false,
            "Whether to drop kernel caches before measuring startup time. Must be root");
DEFINE_bool(measure_startup_sync, false,
            "Whether to call sync() before measuring startup time");
DEFINE_bool(measure_startup_wait_for_bootstrap, false,
            "Whether to wait for all tablets to finish bootstrapping when measuring startup time");
DEFINE_int32(num_columns, 100, "Number of columns in each tablet");
DEFINE_int32(num_seconds, 10, "Number of seconds to run the test");
DEFINE_int32(num_tablets, 100, "Number of tablets to create");
DEFINE_int32(max_blocks_per_container, 8, "Block number limit for each LBM container");
DEFINE_bool(enable_fsync, false, "Whether to enable fsync (disabled by default "
            "for all ExternalMiniCluster tests)");

namespace kudu {

using client::KuduColumnSchema;
using client::KuduSchema;
using client::KuduSchemaBuilder;
using cluster::ExternalMiniClusterOptions;
using std::pair;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;

class DenseNodeTest :
  public ExternalMiniClusterITestBase,
  public testing::WithParamInterface<bool> {
};

INSTANTIATE_TEST_SUITE_P(, DenseNodeTest, testing::Values(false, true));

// Integration test that simulates "dense" Kudu nodes.
//
// Storage heavy deployments can be created by running a data-intensive
// workload for a long time. But that's both time intensive and developer
// unfriendly. This test offers an alternative: a workload that produces a lot
// of metadata with a minimal amount of data. The scale of the metadata can
// proxy for data in areas we care about (such as start up time, thread count,
// memory usage, etc.).
TEST_P(DenseNodeTest, RunTest) {
  ExternalMiniClusterOptions opts;

  opts.extra_tserver_flags = {
      // Flush as fast as possible.
      "--flush_threshold_mb=1",
      "--flush_threshold_secs=1",

      // Don't preallocate anything, otherwise the massive number of LBM
      // containers and WAL segments are likely to fill the disk.
      "--log_preallocate_segments=false",
      "--log_async_preallocate_segments=false",
      "--log_container_preallocate_bytes=0",

      // No need for log retention, and this way we can save more disk space.
      "--log_min_segments_to_retain=1",

      // Drastically increase the number of LBM containers by limiting the
      // number of blocks in each.
      Substitute("--log_container_max_blocks=$0",
                 FLAGS_max_blocks_per_container),

      // UNDO delta block GC runs a lot to eagerly open newly created cfiles.
      // Disable it so we can maximize flushes.
      "--enable_undo_delta_block_gc=false",

      // Compactions aim to reduce the number of rowsets and thus reduce the
      // number of cfiles. Potentially disable them so we can maximize the
      // number of files. NOTE: this also means we don't exercise the block
      // deletions that come with compactions.
      Substitute("--enable_rowset_compaction=$0", FLAGS_enable_rowset_compaction),

      // Allow our single tserver to service many, many RPCs.
      "--rpc_service_queue_length=1000",

      // Inject steroids into the MM.
      "--maintenance_manager_num_threads=100",
      "--maintenance_manager_polling_interval_ms=1",

      // The tserver sometimes crashes with a SIGSEGV in the metrics logging
      // thread while trying to unwind a stack from within tcmalloc. It's
      // unclear as to why, but disabling the logging appears to fix it.
      "--metrics_log_interval_ms=0"
  };

  opts.extra_master_flags = {
      // The number of columns requested may be over the max. In case it is,
      // adjust the max upwards.
      Substitute("--max_num_columns=$0", FLAGS_num_columns)
  };

  if (FLAGS_enable_fsync) {
    opts.extra_tserver_flags.emplace_back("--never_fsync=false");
    opts.extra_master_flags.emplace_back("--never_fsync=false");
  }

  if (GetParam()) {
    opts.extra_master_flags.emplace_back("--encrypt_data_at_rest=true");
    opts.extra_tserver_flags.emplace_back("--encrypt_data_at_rest=true");
  }

  // With the amount of data we're going to write, we need to make sure the
  // tserver has enough time to start back up (startup is only considered to be
  // "complete" when the tserver has loaded all fs metadata from disk).
  opts.start_process_timeout = MonoDelta::FromSeconds(
      std::max(FLAGS_num_seconds * 10.0, opts.start_process_timeout.ToSeconds()));

  NO_FATALS(StartClusterWithOpts(std::move(opts)));

  TestWorkload workload(cluster_.get());
  workload.set_num_replicas(1);

  // Use a custom schema with the number of requested columns.
  KuduSchema schema;
  KuduSchemaBuilder b;
  b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
  for (int i = 1; i < FLAGS_num_columns; i++) {
    b.AddColumn(Substitute("i$0", i))->Type(KuduColumnSchema::INT32)->NotNull();
  }
  CHECK_OK(b.Build(&schema));
  workload.set_schema(schema);

  workload.set_num_tablets(FLAGS_num_tablets);

  // These values are largely arbitrary. Experimentation revealed that they
  // tend to produce the greatest number of containers/blocks.
  workload.set_num_write_threads(1);
  workload.set_write_batch_size(1000);

  // The workload is likely to slow down and time out individual writes. Let's
  // not let that fail the test.
  workload.set_timeout_allowed(true);

  // Run the workload for the specified time period.
  workload.Setup();
  workload.Start();
  SleepFor(MonoDelta::FromSeconds(FLAGS_num_seconds));
  workload.StopAndJoin();

  // Collect some interesting metrics. The cluster is shut down before the
  // metrics are logged so that they're easier to find in the log output.
  vector<pair<string, int64_t>> metrics;
  vector<GaugePrototype<uint64>*> metric_prototypes;
  if (FLAGS_block_manager == "log") {
    metric_prototypes = { &METRIC_log_block_manager_blocks_under_management,
                          &METRIC_log_block_manager_bytes_under_management,
                          &METRIC_log_block_manager_containers,
                          &METRIC_log_block_manager_full_containers,
                          &METRIC_block_manager_total_disk_sync,
                          &METRIC_threads_running };
  } else {
    metric_prototypes = { &METRIC_block_manager_total_bytes_written,
                          &METRIC_block_manager_total_blocks_created,
                          &METRIC_block_manager_total_disk_sync,
                          &METRIC_threads_running };
  }
  for (const auto* m : metric_prototypes) {
    int64_t value;
    ASSERT_OK(itest::GetInt64Metric(
        cluster_->tablet_server(0)->bound_http_hostport(), &METRIC_ENTITY_server,
        "kudu.tabletserver", m, "value", &value));
    metrics.emplace_back(m->name(), value);
  }
  cluster_->Shutdown();

  // Start the cluster back up and measure how long it takes.

  // If requested, call sync() to force all dirty data to be written out.
  if (FLAGS_measure_startup_sync) {
    LOG_TIMING(INFO, "calling sync()") {
      sync();
    }
  }

  // If requested, force the kernel to drop its inode/dentry caches.
  if (FLAGS_measure_startup_drop_caches) {
    LOG_TIMING(INFO, "dropping kernel caches") {
      unique_ptr<WritableFile> f;
      WritableFileOptions opts;
      opts.mode = Env::MUST_EXIST;
      ASSERT_OK(env_->NewWritableFile(opts, "/proc/sys/vm/drop_caches", &f));
      ASSERT_OK(f->Append("3\n"));
      ASSERT_OK(f->Close());
    }
  }

  LOG_TIMING(INFO, "restarting master") {
    ASSERT_OK(cluster_->master()->Restart());
  }
  LOG_TIMING(INFO, "restarting tserver") {
    ASSERT_OK(cluster_->tablet_server(0)->Restart());
  }
  if (FLAGS_measure_startup_wait_for_bootstrap) {
    LOG_TIMING(INFO, "bootstrapping tablets") {
      LOG(INFO) << "waiting for tablets running";
      cluster_->WaitForTabletsRunning(cluster_->tablet_server(0),
                                      FLAGS_num_tablets,
                                      MonoDelta::FromSeconds(3600));
    }
  } else {
    LOG(INFO) << "not waiting for bootstrapping tablets (flag disabled)";
  }
  cluster_->Shutdown();

  for (const auto& p : metrics) {
    LOG(INFO) << p.first << ": " << p.second;
  }
}

} // namespace kudu
