// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "kudu/fs/data_dirs.h"

#include <cmath>
#include <cstdint>
#include <memory>
#include <ostream>
#include <set>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include <gflags/gflags_declare.h>
#include <glog/logging.h>
#include <gtest/gtest.h>

#include "kudu/fs/block_manager.h"
#include "kudu/fs/dir_manager.h"
#include "kudu/fs/fs.pb.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/barrier.h"
#include "kudu/util/env.h"
#include "kudu/util/env_util.h"
#include "kudu/util/metrics.h"
#include "kudu/util/path_util.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"

using std::set;
using std::string;
using std::thread;
using std::vector;
using std::unique_ptr;
using std::unordered_set;
using strings::Substitute;

DECLARE_bool(crash_on_eio);
DECLARE_double(env_inject_eio);
DECLARE_double(env_inject_full);
DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
DECLARE_int32(fs_target_data_dirs_per_tablet);
DECLARE_int64(disk_reserved_bytes_free_for_testing);
DECLARE_int64(fs_data_dirs_reserved_bytes);
DECLARE_string(env_inject_eio_globs);
DECLARE_string(env_inject_full_globs);

METRIC_DECLARE_gauge_uint64(data_dirs_failed);

namespace kudu {
namespace fs {

using internal::DataDirGroup;

static const char* kDirNamePrefix = "test_data_dir";
static const int kNumDirs = 10;

class DataDirsTest : public KuduTest {
 public:
  DataDirsTest() :
      test_tablet_name_("test_tablet"),
      test_block_opts_(CreateBlockOptions({ test_tablet_name_ })),
      entity_(METRIC_ENTITY_server.Instantiate(&registry_, "test")) {}

  virtual void SetUp() override {
    KuduTest::SetUp();
    FLAGS_fs_target_data_dirs_per_tablet = kNumDirs / 2 + 1;
    DataDirManagerOptions opts;
    opts.metric_entity = entity_;
    ASSERT_OK(DataDirManager::CreateNewForTests(
        env_, GetDirNames(kNumDirs), opts, &dd_manager_));
  }

 protected:
  vector<string> GetDirNames(int num_dirs) {
    vector<string> ret;
    for (int i = 0; i < num_dirs; i++) {
      string dir_name = Substitute("$0-$1", kDirNamePrefix, i);
      ret.push_back(GetTestPath(dir_name));
      bool created;
      CHECK_OK(env_util::CreateDirIfMissing(env_, ret[i], &created));
    }
    return ret;
  }

  const string test_tablet_name_;
  const CreateBlockOptions test_block_opts_;
  MetricRegistry registry_;
  scoped_refptr<MetricEntity> entity_;
  std::unique_ptr<DataDirManager> dd_manager_;
};

TEST_F(DataDirsTest, TestCreateGroup) {
  // Test that the DataDirManager doesn't know about the tablets we're about
  // to insert.
  Dir* dd = nullptr;
  Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd);
  ASSERT_EQ(nullptr, dd);
  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no directory group "
                                    "registered for tablet");

  DataDirGroupPB orig_pb;
  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
  ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &orig_pb));

  // Ensure that the DataDirManager will not create a group for a tablet that
  // it already knows about.
  s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "Tried to create directory group for tablet "
                                    "but one is already registered");
  DataDirGroupPB pb;
  ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &pb));

  // Verify that the data directory is unchanged after failing to create an
  // existing tablet.
  for (int i = 0; i < pb.uuids().size(); i++) {
    ASSERT_EQ(orig_pb.uuids(i), pb.uuids(i));
  }
  ASSERT_EQ(orig_pb.uuids().size(), pb.uuids().size());

  // Check that the tablet's DataDirGroup spans the right number of dirs.
  int num_dirs_with_tablets = 0;
  for (const auto& e: dd_manager_->tablets_by_uuid_idx_map_) {
    if (!e.second.empty()) {
      ASSERT_EQ(1, e.second.size());
      num_dirs_with_tablets++;
    }
  }
  ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet, num_dirs_with_tablets);

  // Try to use the group.
  ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
  ASSERT_FALSE(dd->is_full());
}

TEST_F(DataDirsTest, TestLoadFromPB) {
  // Create a PB, delete the group, then load the group from the PB.
  DataDirGroupPB orig_pb;
  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
  ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &orig_pb));
  dd_manager_->DeleteDataDirGroup(test_tablet_name_);
  ASSERT_OK(dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, orig_pb));

  // Check that the tablet's DataDirGroup spans the right number of dirs.
  int num_dirs_with_tablets = 0;
  for (const auto& e: dd_manager_->tablets_by_uuid_idx_map_) {
    if (!e.second.empty()) {
      ASSERT_EQ(1, e.second.size());
      num_dirs_with_tablets++;
    }
  }
  ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet, num_dirs_with_tablets);

  // Ensure that loading from a PB will fail if the DataDirManager already
  // knows about the tablet.
  Status s = dd_manager_->LoadDataDirGroupFromPB(test_tablet_name_, orig_pb);
  ASSERT_TRUE(s.IsAlreadyPresent()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "tried to load directory group for tablet");
}

TEST_F(DataDirsTest, TestDeleteDataDirGroup) {
  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
  Dir* dd;
  ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
  ASSERT_FALSE(dd->is_full());
  dd_manager_->DeleteDataDirGroup(test_tablet_name_);
  Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd);
  ASSERT_FALSE(dd->is_full());
  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "Tried to get directory but no directory group "
                                    "registered for tablet");
}

// Inject full disk errors a couple different ways and make sure that we can't
// create directory groups.
TEST_F(DataDirsTest, TestFullDisk) {
  FLAGS_env_inject_full = 1.0;
  Status s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
  ASSERT_TRUE(s.IsIOError()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "All healthy data directories are full");
  FLAGS_env_inject_full = 0;

  FLAGS_fs_data_dirs_available_space_cache_seconds = 0; // Don't cache device available space.
  FLAGS_fs_data_dirs_reserved_bytes = 1;                // Reserved space.
  FLAGS_disk_reserved_bytes_free_for_testing = 0;       // Free space.

  s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
  ASSERT_TRUE(s.IsIOError()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "All healthy data directories are full");
}

// Test that when an entire directory group is full, new directories are added
// to the group.
TEST_F(DataDirsTest, TestFullDiskGrowsGroup) {
  // First, create a directory group.
  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));

  // Mark every directory in the directory group full.
  vector<string> data_dir_group;
  ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
  FLAGS_fs_data_dirs_available_space_cache_seconds = 0;
  FLAGS_env_inject_full_globs = JoinStrings(data_dir_group, ",");
  FLAGS_env_inject_full = 1.0;

  // Try getting a new directory, adding if necessary.
  Dir* new_dir;
  ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &new_dir));
  unordered_set<string> old_dirs(data_dir_group.begin(), data_dir_group.end());
  ASSERT_FALSE(ContainsKey(old_dirs, new_dir->dir()));

  // The selected directory should have been added to the data directory group.
  vector<string> new_dir_group;
  ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &new_dir_group));
  unordered_set<string> new_dirs(new_dir_group.begin(), new_dir_group.end());
  ASSERT_TRUE(ContainsKey(new_dirs, new_dir->dir()));

  // If all directories are full, we shouldn't be able to get a new directory.
  FLAGS_env_inject_full_globs = "*";
  Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &new_dir);
  ASSERT_STR_CONTAINS(s.ToString(), "No directories available");
  ASSERT_TRUE(s.IsIOError()) << s.ToString();
}

// Test that concurrently adding dirs to a data dir group yields the expected
// number of dirs added.
TEST_F(DataDirsTest, TestGrowGroupInParallel) {
  // Set up a data dir group with no space so we can add to it.
  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
  vector<string> data_dir_group;
  ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
  FLAGS_fs_data_dirs_available_space_cache_seconds = 0;
  FLAGS_env_inject_full_globs = JoinStrings(data_dir_group, ",");
  FLAGS_env_inject_full = 1.0;

  // In parallel, try adding a directory to the group.
  const int kNumThreads = 32;
  vector<thread> threads;
  vector<Status> statuses(kNumThreads);
  vector<Dir*> dds(kNumThreads);
  Barrier b(kNumThreads);
  for (int i = 0; i < kNumThreads; i++) {
    threads.emplace_back([&, i] {
      b.Wait();
      statuses[i] = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dds[i]);
    });
  }
  for (auto& t : threads) {
    t.join();
  }
  for (const auto& s : statuses) {
    EXPECT_OK(s);
  }
  // Check that we added a directory.
  unordered_set<string> old_dirs(data_dir_group.begin(), data_dir_group.end());
  ASSERT_OK(dd_manager_->FindDataDirsByTabletId(test_tablet_name_, &data_dir_group));
  ASSERT_EQ(FLAGS_fs_target_data_dirs_per_tablet + 1, data_dir_group.size());
  Dir* new_dir = dds[0];
  ASSERT_FALSE(ContainsKey(old_dirs, new_dir->dir()));

  // All returned data directories should have been the newly added one.
  for (int i = 1; i < kNumThreads; i++) {
    ASSERT_EQ(new_dir, dds[i]);
  }
}

TEST_F(DataDirsTest, TestFailedDirNotReturned) {
  FLAGS_fs_target_data_dirs_per_tablet = 2;
  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
  Dir* dd;
  Dir* failed_dd;
  int uuid_idx;
  // Fail one of the directories in the group and verify that it is not used.
  ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &failed_dd));
  ASSERT_TRUE(dd_manager_->FindUuidIndexByDir(failed_dd, &uuid_idx));
  // These calls are idempotent.
  ASSERT_OK(dd_manager_->MarkDirFailed(uuid_idx));
  ASSERT_OK(dd_manager_->MarkDirFailed(uuid_idx));
  ASSERT_OK(dd_manager_->MarkDirFailed(uuid_idx));
  ASSERT_EQ(1, down_cast<AtomicGauge<uint64_t>*>(
        entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
  for (int i = 0; i < 10; i++) {
    ASSERT_OK(dd_manager_->GetDirAddIfNecessary(test_block_opts_, &dd));
    ASSERT_NE(dd, failed_dd);
  }

  // Fail the other directory and verify that neither will be used.
  ASSERT_TRUE(dd_manager_->FindUuidIndexByDir(dd, &uuid_idx));
  ASSERT_OK(dd_manager_->MarkDirFailed(uuid_idx));
  ASSERT_EQ(2, down_cast<AtomicGauge<uint64_t>*>(
        entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
  Status s = dd_manager_->GetDirAddIfNecessary(test_block_opts_, &failed_dd);
  ASSERT_TRUE(s.IsIOError()) << s.ToString();
  ASSERT_STR_CONTAINS(s.ToString(), "No healthy directories exist in tablet's directory group");
}

TEST_F(DataDirsTest, TestFailedDirNotAddedToGroup) {
  // Fail one dir and create a group with all directories. The failed directory
  // shouldn't be in the group.
  FLAGS_fs_target_data_dirs_per_tablet = kNumDirs;
  ASSERT_OK(dd_manager_->MarkDirFailed(0));
  ASSERT_EQ(1, down_cast<AtomicGauge<uint64_t>*>(
        entity_->FindOrNull(METRIC_data_dirs_failed).get())->value());
  ASSERT_OK(dd_manager_->CreateDataDirGroup(test_tablet_name_));
  DataDirGroupPB pb;
  ASSERT_OK(dd_manager_->GetDataDirGroupPB(test_tablet_name_, &pb));
  ASSERT_EQ(kNumDirs - 1, pb.uuids_size());

  // Check that all uuid_indices are valid and are not in the failed directory
  // (uuid_idx 0).
  for (const string& uuid : pb.uuids()) {
    int* uuid_idx = FindOrNull(dd_manager_->idx_by_uuid_, uuid);
    ASSERT_NE(nullptr, uuid_idx);
    ASSERT_NE(0, *uuid_idx);
  }
  dd_manager_->DeleteDataDirGroup(test_tablet_name_);

  for (int i = 1; i < kNumDirs - 1; i++) {
    ASSERT_OK(dd_manager_->MarkDirFailed(i));
  }
  Status s = dd_manager_->MarkDirFailed(kNumDirs - 1);
  ASSERT_STR_CONTAINS(s.ToString(), "All dirs have failed");
  ASSERT_TRUE(s.IsIOError()) << s.ToString();

  s = dd_manager_->CreateDataDirGroup(test_tablet_name_);
  ASSERT_STR_CONTAINS(s.ToString(), "No healthy data directories available");
  ASSERT_TRUE(s.IsIOError()) << s.ToString();
}

TEST_F(DataDirsTest, TestLoadBalancingDistribution) {
  FLAGS_fs_target_data_dirs_per_tablet = 3;
  const double kNumTablets = 20;

  // Add 'kNumTablets' tablets, each with groups of size
  // 'FLAGS_fs_target_data_dirs_per_tablet'.
  for (int tablet_idx = 0; tablet_idx < kNumTablets; tablet_idx++) {
    ASSERT_OK(dd_manager_->CreateDataDirGroup(Substitute("$0-$1", test_tablet_name_, tablet_idx)));
  }
  const double kMeanTabletsPerDir = kNumTablets * FLAGS_fs_target_data_dirs_per_tablet / kNumDirs;

  // Calculate the standard deviation of the number of tablets per disk.
  // If tablets are evenly spread across directories, this should be small.
  double sum_squared_dev = 0;
  for (const auto& e : dd_manager_->tablets_by_uuid_idx_map_) {
    LOG(INFO) << Substitute("$0 is storing data from $1 tablets.",
        dd_manager_->dir_by_uuid_idx_[e.first]->dir(), e.second.size());
    double deviation = static_cast<double>(e.second.size()) - kMeanTabletsPerDir;
    sum_squared_dev += deviation * deviation;
  }
  double stddev = sqrt(sum_squared_dev / kNumDirs);
  LOG(INFO) << Substitute("$0 tablets stored across $1 directories.", kNumTablets, kNumDirs);

  // Looping this 1000 times yielded a couple stddev values over 2.0. A high
  // standard deviation does not necessarily reveal an error, but does indicate
  // a relatively unlikely distribution of data directory groups.
  LOG(INFO) << "Standard deviation: " << stddev;

}

TEST_F(DataDirsTest, TestLoadBalancingBias) {
  // Shows that block placement will tend to favor directories with less load.
  // First add a set of tablets for skew. Then add more tablets and check that
  // there's still roughly a uniform distribution.
  FLAGS_fs_target_data_dirs_per_tablet = 5;

  // Start with one data directory group that has some tablets.
  const double kTabletsPerSkewedDir = 10;
  const int kNumSkewedDirs = FLAGS_fs_target_data_dirs_per_tablet;

  // Number of tablets (pre-replication) added after the skew tablets.
  // This configuration will proceed with 10 directories, and a tablet load
  // of 20 * 5, for an mean of 10 tablets associated with each dir.
  const double kNumAdditionalTablets = 10;
  const string kSkewTabletPrefix = "skew_tablet";

  // Manually create a group for the skewed directories.
  //
  // Note: this should not happen in the wild and is used here as a way to
  // introduce some initial skew to the distribution.
  auto uuid_idx_iter = dd_manager_->tablets_by_uuid_idx_map_.begin();
  vector<int> skewed_dir_indices;
  for (int i = 0; i < kNumSkewedDirs; i++) {
    int uuid_idx = uuid_idx_iter->first;
    skewed_dir_indices.push_back(uuid_idx);
    uuid_idx_iter++;
  }

  // Add tablets to each skewed directory.
  for (int skew_tablet_idx = 0; skew_tablet_idx < kTabletsPerSkewedDir; skew_tablet_idx++) {
    string skew_tablet = Substitute("$0-$1", kSkewTabletPrefix, skew_tablet_idx);
    InsertOrDie(&dd_manager_->group_by_tablet_map_, skew_tablet, DataDirGroup(skewed_dir_indices));
    for (int uuid_idx : skewed_dir_indices) {
      InsertOrDie(&FindOrDie(dd_manager_->tablets_by_uuid_idx_map_, uuid_idx), skew_tablet);
    }
  }

  // Add the additional tablets.
  for (int tablet_idx = 0; tablet_idx < kNumAdditionalTablets; tablet_idx++) {
    ASSERT_OK(dd_manager_->CreateDataDirGroup(Substitute("$0-$1", test_tablet_name_, tablet_idx)));
  }

  // Calculate the standard deviation of the number of tablets per disk.
  double sum_squared_dev = 0;
  const double kMeanTabletsPerDir = (kTabletsPerSkewedDir * kNumSkewedDirs +
      kNumAdditionalTablets * FLAGS_fs_target_data_dirs_per_tablet) / kNumDirs;
  for (const auto& e : dd_manager_->tablets_by_uuid_idx_map_) {
    LOG(INFO) << Substitute("$0 is storing data from $1 tablets.",
        dd_manager_->dir_by_uuid_idx_[e.first]->dir(), e.second.size());
    double deviation = static_cast<double>(e.second.size()) - kMeanTabletsPerDir;
    sum_squared_dev += deviation * deviation;
  }
  double stddev = sqrt(sum_squared_dev / kNumDirs);

  // Looping this 5000 times yielded no stddev values higher than 5.0.
  LOG(INFO) << "Standard deviation: " << stddev;

  // Since the skewed directories start out with 10 tablets, a block-placement
  // heuristic that only takes into account tablet load would fail to add more
  // tablets to these skewed directories, as it would lead to all directories
  // having the mean, 10, tablets. Instead, the block-placement heuristic should
  // not completely ignore the initially skewed dirs.
  bool some_added_to_skewed_dirs = false;
  for (int skewed_uuid_index : skewed_dir_indices) {
    set<string>* tablets = FindOrNull(dd_manager_->tablets_by_uuid_idx_map_, skewed_uuid_index);
    ASSERT_NE(nullptr, tablets);
    if (tablets->size() > kTabletsPerSkewedDir) {
      some_added_to_skewed_dirs = true;
    }
  }
  ASSERT_TRUE(some_added_to_skewed_dirs);
}

class DataDirManagerTest : public DataDirsTest {
 public:
  void SetUp() override {
    test_roots_ = JoinPathSegmentsV(GetDirNames(GetNumDirs()), "root");

    // Don't call DataDirsTest::SetUp() to avoid creating the default directory
    // manager.
    KuduTest::SetUp();
  }

  Status OpenDataDirManager() {
    return DataDirManager::OpenExistingForTests(env_, test_roots_, {}, &dd_manager_);
  }

  virtual int GetNumDirs() const { return kNumDirs; }

 protected:
  // The test roots. Data will be placed in a data directory within the root.
  // E.g. Test root: /test_data_dir_0/root, Data dir: /test_data_dir_0/root/data
  vector<string> test_roots_;
};

// Test ensuring that the directory manager can be opened with failed disks,
// provided it was successfully created.
TEST_F(DataDirManagerTest, TestOpenWithFailedDirs) {
  // Create the roots if necessary. This will happen in the FsManager currently.
  for (const string& test_root : test_roots_) {
    ASSERT_OK(env_->CreateDir(test_root));
  }
  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_, {}, &dd_manager_));

  // Kill the first directory.
  FLAGS_crash_on_eio = false;
  FLAGS_env_inject_eio = 1.0;
  FLAGS_env_inject_eio_globs = JoinPathSegments(test_roots_[0], "**");

  // The directory manager will successfully open with the single failed directory.
  ASSERT_OK(OpenDataDirManager());
  set<int> failed_dirs;
  ASSERT_EQ(1, dd_manager_->GetFailedDirs().size());

  // Now fail almost all of the other directories, leaving the last intact.
  for (int i = 1; i < kNumDirs - 1; i++) {
    FLAGS_env_inject_eio_globs = Substitute("$0,$1", FLAGS_env_inject_eio_globs,
                                            JoinPathSegments(test_roots_[i], "**"));
  }
  // The directory manager should be aware of the new failures.
  ASSERT_OK(OpenDataDirManager());
  ASSERT_EQ(kNumDirs - 1, dd_manager_->GetFailedDirs().size());

  // Ensure that when there are no healthy data directories, the open will
  // yield an error.
  FLAGS_env_inject_eio_globs = JoinStrings(JoinPathSegmentsV(test_roots_, "**"), ",");
  Status s = DataDirManager::OpenExistingForTests(env_, test_roots_,
      {}, &dd_manager_);
  ASSERT_STR_CONTAINS(s.ToString(), "could not open directory manager");
  ASSERT_TRUE(s.IsNotFound()) << s.ToString();
}

class TooManyDataDirManagerTest : public DataDirManagerTest {
 public:
  // TraceMetrics::g_intern_map has a limited number of entries, and each data
  // dir used to consume three of them via its threadpool. This value was just
  // enough to exceed the map's capacity.
  int GetNumDirs() const override { return 34; }
};

// Regression test for KUDU-2194.
TEST_F(TooManyDataDirManagerTest, TestTooManyInternedStrings) {
  for (const auto& r : test_roots_) {
    ASSERT_OK(env_->CreateDir(r));
  }
  ASSERT_OK(DataDirManager::CreateNewForTests(env_, test_roots_, {}, &dd_manager_));
}

} // namespace fs
} //namespace kudu
