// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <cstdio>
#include <cstdlib>
#include <limits>
#include <numeric>

#include <boost/filesystem.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/locks.hpp>
#include <gtest/gtest.h>

#include "common/init.h"
#include "runtime/io/request-context.h"
#include "runtime/test-env.h"
#include "runtime/tmp-file-mgr-internal.h"
#include "runtime/tmp-file-mgr.h"
#include "service/fe-support.h"
#include "testutil/gtest-util.h"
#include "util/condition-variable.h"
#include "util/cpu-info.h"
#include "util/filesystem-util.h"
#include "util/collection-metrics.h"
#include "util/metrics.h"

#include "gen-cpp/Types_types.h"  // for TUniqueId

#include "common/names.h"

using boost::filesystem::path;

DECLARE_bool(disk_spill_encryption);
#ifndef NDEBUG
DECLARE_int32(stress_scratch_write_delay_ms);
#endif

namespace impala {

using namespace io;

static const int64_t KILOBYTE = 1024L;
static const int64_t MEGABYTE = 1024L * KILOBYTE;
static const int64_t GIGABYTE = 1024L * MEGABYTE;
static const int64_t TERABYTE = 1024L * GIGABYTE;

class TmpFileMgrTest : public ::testing::Test {
 public:
  virtual void SetUp() {
    metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
    profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test");
    test_env_.reset(new TestEnv);
    ASSERT_OK(test_env_->Init());
    cb_counter_ = 0;

    // Reset query options that are modified by tests.
    FLAGS_disk_spill_encryption = false;
#ifndef NDEBUG
    FLAGS_stress_scratch_write_delay_ms = 0;
#endif
  }

  virtual void TearDown() {
    test_env_.reset();
    metrics_.reset();
    obj_pool_.Clear();
  }

  DiskIoMgr* io_mgr() { return test_env_->exec_env()->disk_io_mgr(); }

  /// Helper to create a TmpFileMgr and initialise it with InitCustom(). Adds the mgr to
  /// 'obj_pool_' for automatic cleanup at the end of each test. Fails the test if
  /// InitCustom() fails.
  TmpFileMgr* CreateTmpFileMgr(const string& tmp_dirs_spec) {
    // Allocate a new metrics group for each TmpFileMgr so they don't get confused by
    // the pre-existing metrics (TmpFileMgr assumes it's a singleton in product code).
    MetricGroup* metrics = obj_pool_.Add(new MetricGroup(""));
    TmpFileMgr* mgr = obj_pool_.Add(new TmpFileMgr());
    EXPECT_OK(mgr->InitCustom(tmp_dirs_spec, false, metrics));
    return mgr;
  }

  /// Check that metric values are consistent with TmpFileMgr state.
  void CheckMetrics(TmpFileMgr* tmp_file_mgr) {
    vector<TmpFileMgr::DeviceId> active = tmp_file_mgr->ActiveTmpDevices();
    IntGauge* active_metric =
        metrics_->FindMetricForTesting<IntGauge>("tmp-file-mgr.active-scratch-dirs");
    EXPECT_EQ(active.size(), active_metric->GetValue());
    SetMetric<string>* active_set_metric =
        metrics_->FindMetricForTesting<SetMetric<string>>(
        "tmp-file-mgr.active-scratch-dirs.list");
    set<string> active_set = active_set_metric->value();
    EXPECT_EQ(active.size(), active_set.size());
    for (int i = 0; i < active.size(); ++i) {
      string tmp_dir_path = tmp_file_mgr->GetTmpDirPath(active[i]);
      EXPECT_TRUE(active_set.find(tmp_dir_path) != active_set.end());
    }
  }

  /// Check that current scratch space bytes and HWM match the expected values.
  void checkHWMMetrics(int64_t exp_current_value, int64_t exp_hwm_value) {
    AtomicHighWaterMarkGauge* hwm_value =
        metrics_->FindMetricForTesting<AtomicHighWaterMarkGauge>(
            "tmp-file-mgr.scratch-space-bytes-used-high-water-mark");
    IntGauge* current_value = hwm_value->current_value_;
    ASSERT_EQ(current_value->GetValue(), exp_current_value);
    ASSERT_EQ(hwm_value->GetValue(), exp_hwm_value);
  }

  void RemoveAndCreateDirs(const vector<string>& dirs) {
    for (const string& dir: dirs) {
      ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
    }
  }

  /// Helper to call the private CreateFiles() method and return
  /// the created files.
  static Status CreateFiles(
      TmpFileMgr::FileGroup* group, vector<TmpFileMgr::File*>* files) {
    // The method expects the lock to be held.
    lock_guard<SpinLock> lock(group->lock_);
    RETURN_IF_ERROR(group->CreateFiles());
    for (unique_ptr<TmpFileMgr::File>& file : group->tmp_files_) {
      files->push_back(file.get());
    }
    return Status::OK();
  }

  /// Helper to get the private tmp_dirs_ member.
  static const vector<TmpFileMgr::TmpDir>& GetTmpDirs(TmpFileMgr* mgr) {
    return mgr->tmp_dirs_;
  }

  /// Helper to call the private TmpFileMgr::NewFile() method.
  static void NewFile(TmpFileMgr* mgr, TmpFileMgr::FileGroup* group,
      TmpFileMgr::DeviceId device_id, unique_ptr<TmpFileMgr::File>* new_file) {
    mgr->NewFile(group, device_id, new_file);
  }

  /// Helper to call the private File::AllocateSpace() method.
  static void FileAllocateSpace(
      TmpFileMgr::File* file, int64_t num_bytes, int64_t* offset) {
    file->AllocateSpace(num_bytes, offset);
  }

  /// Helper to call the private FileGroup::AllocateSpace() method.
  static Status GroupAllocateSpace(TmpFileMgr::FileGroup* group, int64_t num_bytes,
      TmpFileMgr::File** file, int64_t* offset) {
    return group->AllocateSpace(num_bytes, file, offset);
  }

  /// Helper to set FileGroup::next_allocation_index_.
  static void SetNextAllocationIndex(TmpFileMgr::FileGroup* group, int value) {
    group->next_allocation_index_ = value;
  }

  /// Helper to cancel the FileGroup RequestContext.
  static void CancelIoContext(TmpFileMgr::FileGroup* group) {
    group->io_ctx_->Cancel();
  }

  /// Helper to get the # of bytes allocated by the group. Validates that the sum across
  /// all files equals this total.
  static int64_t BytesAllocated(TmpFileMgr::FileGroup* group) {
    int64_t bytes_allocated = 0;
    for (unique_ptr<TmpFileMgr::File>& file : group->tmp_files_) {
      bytes_allocated += file->bytes_allocated_;
    }
    EXPECT_EQ(bytes_allocated, group->current_bytes_allocated_);
    return bytes_allocated;
  }

  /// Helpers to call WriteHandle methods.
  void Cancel(TmpFileMgr::WriteHandle* handle) { handle->Cancel(); }
  void WaitForWrite(TmpFileMgr::WriteHandle* handle) {
    handle->WaitForWrite();
  }

  // Write callback, which signals 'cb_cv_' and increments 'cb_counter_'.
  void SignalCallback(Status write_status) {
    {
      lock_guard<mutex> lock(cb_cv_lock_);
      ++cb_counter_;
    }
    cb_cv_.NotifyAll();
  }

  /// Wait until 'cb_counter_' reaches 'val'.
  void WaitForCallbacks(int64_t val) {
    unique_lock<mutex> lock(cb_cv_lock_);
    while (cb_counter_ < val) cb_cv_.Wait(lock);
  }

  /// Implementation of TestBlockVerification(), which is run with different environments.
  void TestBlockVerification();

  ObjectPool obj_pool_;
  scoped_ptr<MetricGroup> metrics_;
  // Owned by 'obj_pool_'.
  RuntimeProfile* profile_;

  /// Used for DiskIoMgr.
  scoped_ptr<TestEnv> test_env_;

  // Variables used by SignalCallback().
  mutex cb_cv_lock_;
  ConditionVariable cb_cv_;
  int64_t cb_counter_;
};

/// Regression test for IMPALA-2160. Verify that temporary file manager allocates blocks
/// at the expected file offsets.
TEST_F(TmpFileMgrTest, TestFileAllocation) {
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.Init(metrics_.get()));
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(
      &tmp_file_mgr, io_mgr(), profile_, id, 1024 * 1024 * 8);

  // Default configuration should give us one temporary device.
  EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices());
  vector<TmpFileMgr::DeviceId> tmp_devices = tmp_file_mgr.ActiveTmpDevices();
  EXPECT_EQ(1, tmp_devices.size());
  vector<TmpFileMgr::File*> files;
  ASSERT_OK(CreateFiles(&file_group, &files));
  EXPECT_EQ(1, files.size());
  TmpFileMgr::File* file = files[0];
  // Apply writes of variable sizes and check space was allocated correctly.
  int64_t write_sizes[] = {1, 10, 1024, 4, 1024 * 1024 * 8, 1024 * 1024 * 8, 16, 10};
  int num_write_sizes = sizeof(write_sizes) / sizeof(write_sizes[0]);
  int64_t next_offset = 0;
  for (int i = 0; i < num_write_sizes; ++i) {
    int64_t offset;
    FileAllocateSpace(file, write_sizes[i], &offset);
    EXPECT_EQ(next_offset, offset);
    next_offset = offset + write_sizes[i];
  }
  // Check that cleanup is correct.
  string file_path = file->path();
  EXPECT_FALSE(boost::filesystem::exists(file_path));

  // Check that the file is cleaned up correctly. Need to create file first since
  // tmp file is only allocated on writes.
  EXPECT_OK(FileSystemUtil::CreateFile(file->path()));
  file_group.Close();
  EXPECT_FALSE(boost::filesystem::exists(file->path()));
  CheckMetrics(&tmp_file_mgr);
}

/// Test that we can do initialization with two directories on same device and
/// that validations prevents duplication of directories.
TEST_F(TmpFileMgrTest, TestOneDirPerDevice) {
  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, metrics_.get()));
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);

  // Only the first directory should be used.
  EXPECT_EQ(1, tmp_file_mgr.NumActiveTmpDevices());
  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
  EXPECT_EQ(1, devices.size());
  vector<TmpFileMgr::File*> files;
  ASSERT_OK(CreateFiles(&file_group, &files));
  EXPECT_EQ(1, files.size());
  TmpFileMgr::File* file = files[0];
  // Check the prefix is the expected temporary directory.
  EXPECT_EQ(0, file->path().find(tmp_dirs[0]));
  ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
  file_group.Close();
  CheckMetrics(&tmp_file_mgr);
}

/// Test that we can do custom initialization with two dirs on same device.
TEST_F(TmpFileMgrTest, TestMultiDirsPerDevice) {
  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);

  // Both directories should be used.
  EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices());
  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
  EXPECT_EQ(2, devices.size());

  vector<TmpFileMgr::File*> files;
  ASSERT_OK(CreateFiles(&file_group, &files));
  EXPECT_EQ(2, files.size());
  for (int i = 0; i < 2; ++i) {
    EXPECT_EQ(0, tmp_file_mgr.GetTmpDirPath(devices[i]).find(tmp_dirs[i]));
    // Check the prefix is the expected temporary directory.
    EXPECT_EQ(0, files[i]->path().find(tmp_dirs[i]));
  }
  ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
  file_group.Close();
  CheckMetrics(&tmp_file_mgr);
}

/// Test that reporting a write error is possible but does not result in
/// blacklisting the device.
TEST_F(TmpFileMgrTest, TestReportError) {
  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);

  // Both directories should be used.
  vector<TmpFileMgr::DeviceId> devices = tmp_file_mgr.ActiveTmpDevices();
  EXPECT_EQ(2, devices.size());
  CheckMetrics(&tmp_file_mgr);

  // Inject an error on one device so that we can validate it is handled correctly.
  int good_device = 0, bad_device = 1;
  vector<TmpFileMgr::File*> files;
  ASSERT_OK(CreateFiles(&file_group, &files));
  ASSERT_EQ(2, files.size());
  TmpFileMgr::File* good_file = files[good_device];
  TmpFileMgr::File* bad_file = files[bad_device];
  ErrorMsg errmsg(TErrorCode::GENERAL, "A fake error");
  bad_file->Blacklist(errmsg);

  // File-level blacklisting is enabled but not device-level.
  EXPECT_TRUE(bad_file->is_blacklisted());
  // The bad device should still be active.
  EXPECT_EQ(2, tmp_file_mgr.NumActiveTmpDevices());
  vector<TmpFileMgr::DeviceId> devices_after = tmp_file_mgr.ActiveTmpDevices();
  EXPECT_EQ(2, devices_after.size());
  CheckMetrics(&tmp_file_mgr);

  // Attempts to expand bad file should succeed.
  int64_t offset;
  FileAllocateSpace(bad_file, 128, &offset);
  // The good device should still be usable.
  FileAllocateSpace(good_file, 128, &offset);
  // Attempts to allocate new files on bad device should succeed.
  unique_ptr<TmpFileMgr::File> bad_file2;
  NewFile(&tmp_file_mgr, &file_group, bad_device, &bad_file2);
  ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
  file_group.Close();
  CheckMetrics(&tmp_file_mgr);
}

TEST_F(TmpFileMgrTest, TestAllocateNonWritable) {
  vector<string> tmp_dirs;
  vector<string> scratch_subdirs;
  for (int i = 0; i < 2; ++i) {
    tmp_dirs.push_back(Substitute("/tmp/tmp-file-mgr-test.$0", i));
    scratch_subdirs.push_back(tmp_dirs[i] + "/impala-scratch");
  }
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);

  vector<TmpFileMgr::File*> allocated_files;
  ASSERT_OK(CreateFiles(&file_group, &allocated_files));
  int64_t offset;
  FileAllocateSpace(allocated_files[0], 1, &offset);

  // Make scratch non-writable and test allocation at different stages:
  // new file creation, files with no allocated blocks. files with allocated space.
  // No errors should be encountered during allocation since allocation is purely logical.
  chmod(scratch_subdirs[0].c_str(), 0);
  FileAllocateSpace(allocated_files[0], 1, &offset);
  FileAllocateSpace(allocated_files[1], 1, &offset);

  chmod(scratch_subdirs[0].c_str(), S_IRWXU);
  ASSERT_OK(FileSystemUtil::RemovePaths(tmp_dirs));
  file_group.Close();
}

// Test scratch limit is applied correctly to group of files.
TEST_F(TmpFileMgrTest, TestScratchLimit) {
  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));

  const int64_t LIMIT = 128;
  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
  const int64_t ALLOC_SIZE = 64;
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id, LIMIT);

  vector<TmpFileMgr::File*> files;
  ASSERT_OK(CreateFiles(&file_group, &files));

  // Test individual limit is enforced.
  Status status;
  int64_t offset;
  TmpFileMgr::File* alloc_file;

  // Alloc from file 1 should succeed.
  SetNextAllocationIndex(&file_group, 0);
  ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset));
  ASSERT_EQ(alloc_file, files[0]); // Should select files round-robin.
  ASSERT_EQ(0, offset);

  // Allocate up to the max.
  ASSERT_OK(GroupAllocateSpace(&file_group, ALLOC_SIZE, &alloc_file, &offset));
  ASSERT_EQ(0, offset);
  ASSERT_EQ(alloc_file, files[1]);

  // Test aggregate limit is enforced.
  status = GroupAllocateSpace(&file_group, 1, &alloc_file, &offset);
  ASSERT_FALSE(status.ok());
  ASSERT_EQ(status.code(), TErrorCode::SCRATCH_LIMIT_EXCEEDED);
  ASSERT_NE(string::npos, status.msg().msg().find(GetBackendString()));

  // Check HWM metrics
  checkHWMMetrics(LIMIT, LIMIT);
  file_group.Close();
  checkHWMMetrics(0, LIMIT);
}

// Test that scratch file ranges of varying length are recycled as expected.
TEST_F(TmpFileMgrTest, TestScratchRangeRecycling) {
  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));
  TUniqueId id;

  TmpFileMgr::FileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
  int64_t expected_scratch_bytes_allocated = 0;
  // Test some different allocation sizes.
  checkHWMMetrics(0, 0);
  for (int alloc_size = 64; alloc_size <= 64 * 1024; alloc_size *= 2) {
    // Generate some data.
    const int BLOCKS = 5;
    vector<vector<uint8_t>> data(BLOCKS);
    for (int i = 0; i < BLOCKS; ++i) {
      data[i].resize(alloc_size);
      std::iota(data[i].begin(), data[i].end(), i);
    }

    WriteRange::WriteDoneCallback callback =
        bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
    vector<unique_ptr<TmpFileMgr::WriteHandle>> handles(BLOCKS);
    // 'file_group' should allocate extra scratch bytes for this 'alloc_size'.
    expected_scratch_bytes_allocated += alloc_size * BLOCKS;
    const int TEST_ITERS = 5;

    // Make sure free space doesn't grow over several iterations.
    for (int i = 0; i < TEST_ITERS; ++i) {
      cb_counter_ = 0;
      for (int j = 0; j < BLOCKS; ++j) {
        ASSERT_OK(file_group.Write(
            MemRange(data[j].data(), alloc_size), callback, &handles[j]));
      }
      WaitForCallbacks(BLOCKS);
      EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group));
      checkHWMMetrics(expected_scratch_bytes_allocated, expected_scratch_bytes_allocated);

      // Read back and validate.
      for (int j = 0; j < BLOCKS; ++j) {
        vector<uint8_t> tmp(alloc_size);
        ASSERT_OK(file_group.Read(handles[j].get(), MemRange(tmp.data(), alloc_size)));
        EXPECT_EQ(0, memcmp(tmp.data(), data[j].data(), alloc_size));
        file_group.DestroyWriteHandle(move(handles[j]));
      }
      // Check that the space is still in use - it should be recycled by the next
      // iteration.
      EXPECT_EQ(expected_scratch_bytes_allocated, BytesAllocated(&file_group));
      checkHWMMetrics(expected_scratch_bytes_allocated, expected_scratch_bytes_allocated);
    }
  }
  file_group.Close();
  checkHWMMetrics(0, expected_scratch_bytes_allocated);
}

// Regression test for IMPALA-4748, where hitting the process memory limit caused
// internal invariants of TmpFileMgr to be broken on error path.
TEST_F(TmpFileMgrTest, TestProcessMemLimitExceeded) {
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(test_env_->tmp_file_mgr(), io_mgr(), profile_, id);

  const int DATA_SIZE = 64;
  vector<uint8_t> data(DATA_SIZE);

  // Fake the asynchronous error from the process mem limit by cancelling the io context.
  CancelIoContext(&file_group);

  // After this error, writing via the file group should fail.
  WriteRange::WriteDoneCallback callback =
      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
  unique_ptr<TmpFileMgr::WriteHandle> handle;
  Status status = file_group.Write(MemRange(data.data(), DATA_SIZE), callback, &handle);
  EXPECT_EQ(TErrorCode::CANCELLED_INTERNALLY, status.code());
  file_group.Close();
  test_env_->TearDownQueries();
}

// Regression test for IMPALA-4820 - encrypted data can get written to disk.
TEST_F(TmpFileMgrTest, TestEncryptionDuringCancellation) {
  FLAGS_disk_spill_encryption = true;
  // A delay is required for this to reproduce the issue, since writes are often buffered
  // in memory by the OS. The test should succeed regardless of the delay.
#ifndef NDEBUG
  FLAGS_stress_scratch_write_delay_ms = 1000;
#endif
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(test_env_->tmp_file_mgr(), io_mgr(), profile_, id);

  // Make the data fairly large so that we have a better chance of cancelling while the
  // write is in flight
  const int DATA_SIZE = 8 * 1024 * 1024;
  string data(DATA_SIZE, ' ');
  MemRange data_mem_range(reinterpret_cast<uint8_t*>(&data[0]), DATA_SIZE);

  // Write out a string repeatedly. We don't want to see this written unencypted to disk.
  string plaintext("the quick brown fox jumped over the lazy dog");
  for (int pos = 0; pos + plaintext.size() < DATA_SIZE; pos += plaintext.size()) {
    memcpy(&data[pos], plaintext.data(), plaintext.size());
  }

  // Start a write in flight, which should encrypt the data and write it to disk.
  unique_ptr<TmpFileMgr::WriteHandle> handle;
  WriteRange::WriteDoneCallback callback =
      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
  ASSERT_OK(file_group.Write(data_mem_range, callback, &handle));
  string file_path = handle->TmpFilePath();

  // Cancel the write - prior to the IMPALA-4820 fix decryption could race with the write.
  Cancel(handle.get());
  WaitForWrite(handle.get());
  ASSERT_OK(file_group.RestoreData(move(handle), data_mem_range));
  WaitForCallbacks(1);

  // Read the data from the scratch file and check that the plaintext isn't present.
  FILE* file = fopen(file_path.c_str(), "r");
  ASSERT_EQ(DATA_SIZE, fread(&data[0], 1, DATA_SIZE, file));
  for (int pos = 0; pos + plaintext.size() < DATA_SIZE; pos += plaintext.size()) {
    ASSERT_NE(0, memcmp(&data[pos], plaintext.data(), plaintext.size()))
        << file_path << "@" << pos;
  }
  fclose(file);
  file_group.Close();
  test_env_->TearDownQueries();
}

TEST_F(TmpFileMgrTest, TestBlockVerification) {
  TestBlockVerification();
}

TEST_F(TmpFileMgrTest, TestBlockVerificationGcmDisabled) {
  // Disable AES-GCM to test that errors from non-AES-GCM verification are also correct.
  CpuInfo::TempDisable t1(CpuInfo::PCLMULQDQ);
  TestBlockVerification();
}

void TmpFileMgrTest::TestBlockVerification() {
  FLAGS_disk_spill_encryption = true;
  TUniqueId id;
  TmpFileMgr::FileGroup file_group(test_env_->tmp_file_mgr(), io_mgr(), profile_, id);
  string data = "the quick brown fox jumped over the lazy dog";
  MemRange data_mem_range(reinterpret_cast<uint8_t*>(&data[0]), data.size());

  // Start a write in flight, which should encrypt the data and write it to disk.
  unique_ptr<TmpFileMgr::WriteHandle> handle;
  WriteRange::WriteDoneCallback callback =
      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
  ASSERT_OK(file_group.Write(data_mem_range, callback, &handle));
  string file_path = handle->TmpFilePath();

  WaitForWrite(handle.get());
  WaitForCallbacks(1);

  // Modify the data in the scratch file and check that a read error occurs.
  LOG(INFO) << "Corrupting " << file_path;
  uint8_t corrupt_byte = data[0] ^ 1;
  FILE* file = fopen(file_path.c_str(), "rb+");
  ASSERT_TRUE(file != nullptr);
  ASSERT_EQ(corrupt_byte, fputc(corrupt_byte, file));
  ASSERT_EQ(0, fclose(file));
  vector<uint8_t> tmp;
  tmp.resize(data.size());
  Status read_status = file_group.Read(handle.get(), MemRange(tmp.data(), tmp.size()));
  LOG(INFO) << read_status.GetDetail();
  EXPECT_EQ(TErrorCode::SCRATCH_READ_VERIFY_FAILED, read_status.code())
      << read_status.GetDetail();

  // Modify the data in memory. Restoring the data should fail.
  LOG(INFO) << "Corrupting data in memory";
  data[0] = corrupt_byte;
  Status restore_status = file_group.RestoreData(move(handle), data_mem_range);
  LOG(INFO) << restore_status.GetDetail();
  EXPECT_EQ(TErrorCode::SCRATCH_READ_VERIFY_FAILED, restore_status.code())
      << restore_status.GetDetail();

  file_group.Close();
  test_env_->TearDownQueries();
}

// Test that the current scratch space bytes and HWM values are proper when different
// FileGroups are used concurrently. This test unit mimics concurrent spilling queries.
TEST_F(TmpFileMgrTest, TestHWMMetric) {
  RuntimeProfile* profile_1 = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test-1");
  RuntimeProfile* profile_2 = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test-2");
  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, false, metrics_.get()));

  const int64_t LIMIT = 128;
  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
  const int64_t ALLOC_SIZE = 64;
  TUniqueId id_1;
  TmpFileMgr::FileGroup file_group_1(&tmp_file_mgr, io_mgr(), profile_1, id_1, LIMIT);
  TUniqueId id_2;
  TmpFileMgr::FileGroup file_group_2(&tmp_file_mgr, io_mgr(), profile_2, id_2, LIMIT);

  vector<TmpFileMgr::File*> files;
  ASSERT_OK(CreateFiles(&file_group_1, &files));
  ASSERT_OK(CreateFiles(&file_group_2, &files));

  Status status;
  int64_t offset;
  TmpFileMgr::File* alloc_file;

  // Alloc from file_group_1 and file_group_2 interleaving allocations.
  SetNextAllocationIndex(&file_group_1, 0);
  ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
  ASSERT_EQ(alloc_file, files[0]);
  ASSERT_EQ(0, offset);

  SetNextAllocationIndex(&file_group_2, 0);
  ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
  ASSERT_EQ(alloc_file, files[2]);
  ASSERT_EQ(0, offset);

  ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
  ASSERT_EQ(0, offset);
  ASSERT_EQ(alloc_file, files[1]);

  ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
  ASSERT_EQ(0, offset);
  ASSERT_EQ(alloc_file, files[3]);

  EXPECT_EQ(LIMIT, BytesAllocated(&file_group_1));
  EXPECT_EQ(LIMIT, BytesAllocated(&file_group_2));

  // Check HWM metrics
  checkHWMMetrics(2 * LIMIT, 2 * LIMIT);
  file_group_1.Close();
  checkHWMMetrics(LIMIT, 2 * LIMIT);
  file_group_2.Close();
  checkHWMMetrics(0, 2 * LIMIT);
}

// Test that usage per directory is tracked correctly and per-directory limits are
// enforced. Sets up several scratch directories, some with limits, and checks
// that the allocations occur in the right directories.
TEST_F(TmpFileMgrTest, TestDirectoryLimits) {
  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2",
      "/tmp/tmp-file-mgr-test.3"});
  vector<string> tmp_dir_specs({"/tmp/tmp-file-mgr-test.1:512",
      "/tmp/tmp-file-mgr-test.2:1k", "/tmp/tmp-file-mgr-test.3"});
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get()));

  TmpFileMgr::FileGroup file_group_1(
      &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId());
  TmpFileMgr::FileGroup file_group_2(
      &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p2"), TUniqueId());

  vector<TmpFileMgr::File*> files;
  ASSERT_OK(CreateFiles(&file_group_1, &files));
  ASSERT_OK(CreateFiles(&file_group_2, &files));

  IntGauge* dir1_usage = metrics_->FindMetricForTesting<IntGauge>(
      "tmp-file-mgr.scratch-space-bytes-used.dir-0");
  IntGauge* dir2_usage = metrics_->FindMetricForTesting<IntGauge>(
      "tmp-file-mgr.scratch-space-bytes-used.dir-1");
  IntGauge* dir3_usage = metrics_->FindMetricForTesting<IntGauge>(
      "tmp-file-mgr.scratch-space-bytes-used.dir-2");

  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
  const int64_t ALLOC_SIZE = 512;
  int64_t offset;
  TmpFileMgr::File* alloc_file;

  // Allocate three times - once per directory. We expect these allocations to go through
  // so we should have one allocation in each directory.
  SetNextAllocationIndex(&file_group_1, 0);
  for (int i = 0; i < tmp_dir_specs.size(); ++i) {
    ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
  }
  EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
  EXPECT_EQ(ALLOC_SIZE, dir2_usage->GetValue());
  EXPECT_EQ(ALLOC_SIZE, dir3_usage->GetValue());

  // This time we should hit the limit on the first directory. Do this from a
  // different file group to show that limits are enforced across file groups.
  for (int i = 0; i < 2; ++i) {
    ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
  }
  EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
  EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
  EXPECT_EQ(2 * ALLOC_SIZE, dir3_usage->GetValue());

  // Now we're at the limits on two directories, all allocations should got to the
  // last directory without a limit.
  for (int i = 0; i < 100; ++i) {
    ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
  }
  EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
  EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
  EXPECT_EQ(102 * ALLOC_SIZE, dir3_usage->GetValue());

  file_group_2.Close();
  // Metrics should be decremented when the file groups delete the underlying files.
  EXPECT_EQ(ALLOC_SIZE, dir1_usage->GetValue());
  EXPECT_EQ(ALLOC_SIZE, dir2_usage->GetValue());
  EXPECT_EQ(ALLOC_SIZE, dir3_usage->GetValue());

  // We should be able to reuse the space freed up.
  ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));

  EXPECT_EQ(2 * ALLOC_SIZE, dir2_usage->GetValue());
  file_group_1.Close();
  EXPECT_EQ(0, dir1_usage->GetValue());
  EXPECT_EQ(0, dir2_usage->GetValue());
  EXPECT_EQ(0, dir3_usage->GetValue());
}

// Test the case when all per-directory limits are hit. We expect to return a status
// and fail gracefully.
TEST_F(TmpFileMgrTest, TestDirectoryLimitsExhausted) {
  vector<string> tmp_dirs({"/tmp/tmp-file-mgr-test.1", "/tmp/tmp-file-mgr-test.2"});
  vector<string> tmp_dir_specs(
      {"/tmp/tmp-file-mgr-test.1:256kb", "/tmp/tmp-file-mgr-test.2:1mb"});
  const int64_t DIR1_LIMIT = 256L * 1024L;
  const int64_t DIR2_LIMIT = 1024L * 1024L;
  RemoveAndCreateDirs(tmp_dirs);
  TmpFileMgr tmp_file_mgr;
  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dir_specs, false, metrics_.get()));

  TmpFileMgr::FileGroup file_group_1(
      &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p1"), TUniqueId());
  TmpFileMgr::FileGroup file_group_2(
      &tmp_file_mgr, io_mgr(), RuntimeProfile::Create(&obj_pool_, "p2"), TUniqueId());

  vector<TmpFileMgr::File*> files;
  ASSERT_OK(CreateFiles(&file_group_1, &files));
  ASSERT_OK(CreateFiles(&file_group_2, &files));

  IntGauge* dir1_usage = metrics_->FindMetricForTesting<IntGauge>(
      "tmp-file-mgr.scratch-space-bytes-used.dir-0");
  IntGauge* dir2_usage = metrics_->FindMetricForTesting<IntGauge>(
      "tmp-file-mgr.scratch-space-bytes-used.dir-1");

  // A power-of-two so that FileGroup allocates exactly this amount of scratch space.
  const int64_t ALLOC_SIZE = 512;
  const int64_t MAX_ALLOCATIONS = (DIR1_LIMIT + DIR2_LIMIT) / ALLOC_SIZE;
  int64_t offset;
  TmpFileMgr::File* alloc_file;

  // Allocate exactly the maximum total capacity of the directories.
  SetNextAllocationIndex(&file_group_1, 0);
  for (int i = 0; i < MAX_ALLOCATIONS; ++i) {
    ASSERT_OK(GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset));
  }
  EXPECT_EQ(DIR1_LIMIT, dir1_usage->GetValue());
  EXPECT_EQ(DIR2_LIMIT, dir2_usage->GetValue());
  // The directories are at capacity, so allocations should fail.
  Status err1 = GroupAllocateSpace(&file_group_1, ALLOC_SIZE, &alloc_file, &offset);
  ASSERT_EQ(err1.code(), TErrorCode::SCRATCH_ALLOCATION_FAILED);
  EXPECT_STR_CONTAINS(err1.GetDetail(), "Could not create files in any configured "
      "scratch directories (--scratch_dirs=/tmp/tmp-file-mgr-test.1/impala-scratch,"
      "/tmp/tmp-file-mgr-test.2/impala-scratch)");
  EXPECT_STR_CONTAINS(err1.GetDetail(), "1.25 MB of scratch is currently in use by "
      "this Impala Daemon (1.25 MB by this query)");
  Status err2 = GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset);
  ASSERT_EQ(err2.code(), TErrorCode::SCRATCH_ALLOCATION_FAILED);
  EXPECT_STR_CONTAINS(err2.GetDetail(), "Could not create files in any configured "
      "scratch directories (--scratch_dirs=/tmp/tmp-file-mgr-test.1/impala-scratch,"
      "/tmp/tmp-file-mgr-test.2/impala-scratch)");
  EXPECT_STR_CONTAINS(err2.GetDetail(), "1.25 MB of scratch is currently in use by "
      "this Impala Daemon (0 by this query)");

  // A FileGroup should recover once allocations are released, i.e. it does not
  // permanently block allocating files from the group.
  file_group_1.Close();
  ASSERT_OK(GroupAllocateSpace(&file_group_2, ALLOC_SIZE, &alloc_file, &offset));
  file_group_2.Close();
}

// Test the directory parsing logic, including the various error cases.
TEST_F(TmpFileMgrTest, TestDirectoryLimitParsing) {
  RemoveAndCreateDirs({"/tmp/tmp-file-mgr-test1", "/tmp/tmp-file-mgr-test2",
      "/tmp/tmp-file-mgr-test3", "/tmp/tmp-file-mgr-test4", "/tmp/tmp-file-mgr-test5",
      "/tmp/tmp-file-mgr-test6", "/tmp/tmp-file-mgr-test7"});
  // Configure various directories with valid formats.
  auto& dirs = GetTmpDirs(
      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:5g,/tmp/tmp-file-mgr-test2,"
                       "/tmp/tmp-file-mgr-test3:1234,/tmp/tmp-file-mgr-test4:99999999,"
                       "/tmp/tmp-file-mgr-test5:200tb,/tmp/tmp-file-mgr-test6:100MB"));
  EXPECT_EQ(6, dirs.size());
  EXPECT_EQ(5 * GIGABYTE, dirs[0].bytes_limit);
  EXPECT_EQ(numeric_limits<int64_t>::max(), dirs[1].bytes_limit);
  EXPECT_EQ(1234, dirs[2].bytes_limit);
  EXPECT_EQ(99999999, dirs[3].bytes_limit);
  EXPECT_EQ(200 * TERABYTE, dirs[4].bytes_limit);
  EXPECT_EQ(100 * MEGABYTE, dirs[5].bytes_limit);

  // Various invalid limit formats result in the directory getting skipped.
  // Include a valid dir on the end to ensure that we don't short-circuit all
  // directories.
  auto& dirs2 = GetTmpDirs(
      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:foo,/tmp/tmp-file-mgr-test2:?,"
                       "/tmp/tmp-file-mgr-test3:1.2.3.4,/tmp/tmp-file-mgr-test4: ,"
                       "/tmp/tmp-file-mgr-test5:5pb,/tmp/tmp-file-mgr-test6:10%,"
                       "/tmp/tmp-file-mgr-test1:100"));
  EXPECT_EQ(1, dirs2.size());
  EXPECT_EQ("/tmp/tmp-file-mgr-test1/impala-scratch", dirs2[0].path);
  EXPECT_EQ(100, dirs2[0].bytes_limit);

  // Various valid ways of specifying "unlimited".
  auto& dirs3 =
      GetTmpDirs(CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:,/tmp/tmp-file-mgr-test2:-1,"
                                  "/tmp/tmp-file-mgr-test3,/tmp/tmp-file-mgr-test4:0"));
  EXPECT_EQ(4, dirs3.size());
  for (const auto& dir : dirs3) {
    EXPECT_EQ(numeric_limits<int64_t>::max(), dir.bytes_limit);
  }

  // Extra colons
  auto& dirs4 = GetTmpDirs(
      CreateTmpFileMgr("/tmp/tmp-file-mgr-test1:1:,/tmp/tmp-file-mgr-test2:10mb::"));
  EXPECT_EQ(0, dirs4.size());

  // Empty strings.
  auto& nodirs = GetTmpDirs(CreateTmpFileMgr(""));
  EXPECT_EQ(0, nodirs.size());
  auto& empty_paths = GetTmpDirs(CreateTmpFileMgr(","));
  EXPECT_EQ(2, empty_paths.size());
}
} // namespace impala
