// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <gflags/gflags.h>
#include <gtest/gtest.h>
#include <memory>

#include "kudu/common/schema.h"
#include "kudu/fs/fs-test-util.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/delta_tracker.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/util/memenv/memenv.h"
#include "kudu/util/test_macros.h"

DECLARE_int32(deltafile_default_block_size);
DECLARE_bool(log_block_manager_test_hole_punching);
DEFINE_int32(first_row_to_update, 10000, "the first row to update");
DEFINE_int32(last_row_to_update, 100000, "the last row to update");
DEFINE_int32(n_verify, 1, "number of times to verify the updates"
             "(useful for benchmarks");

using std::is_sorted;
using std::shared_ptr;

namespace kudu {
namespace tablet {

using fs::CountingReadableBlock;
using fs::ReadableBlock;
using fs::WritableBlock;

// Test path to write delta file to (in in-memory environment)
const char kTestPath[] = "/tmp/test";

class TestDeltaFile : public ::testing::Test {
 public:
  TestDeltaFile() :
    env_(NewMemEnv(Env::Default())),
    schema_(CreateSchema()),
    arena_(1024, 1024) {
    // Can't check on-disk file size with a memenv.
    FLAGS_log_block_manager_test_hole_punching = false;
  }

 public:
  void SetUp() OVERRIDE {
    fs_manager_.reset(new FsManager(env_.get(), kTestPath));
    ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
    ASSERT_OK(fs_manager_->Open());
  }

  static Schema CreateSchema() {
    SchemaBuilder builder;
    CHECK_OK(builder.AddColumn("val", UINT32));
    return builder.Build();
  }

  void WriteTestFile(int min_timestamp = 0, int max_timestamp = 0) {
    gscoped_ptr<WritableBlock> block;
    ASSERT_OK(fs_manager_->CreateNewBlock(&block));
    test_block_ = block->id();
    DeltaFileWriter dfw(std::move(block));
    ASSERT_OK(dfw.Start());

    // Update even numbered rows.
    faststring buf;

    DeltaStats stats;
    for (int i = FLAGS_first_row_to_update; i <= FLAGS_last_row_to_update; i += 2) {
      for (int timestamp = min_timestamp; timestamp <= max_timestamp; timestamp++) {
        buf.clear();
        RowChangeListEncoder update(&buf);
        uint32_t new_val = timestamp + i;
        update.AddColumnUpdate(schema_.column(0), schema_.column_id(0), &new_val);
        DeltaKey key(i, Timestamp(timestamp));
        RowChangeList rcl(buf);
        ASSERT_OK_FAST(dfw.AppendDelta<REDO>(key, rcl));
        ASSERT_OK_FAST(stats.UpdateStats(key.timestamp(), rcl));
      }
    }
    ASSERT_OK(dfw.WriteDeltaStats(stats));
    ASSERT_OK(dfw.Finish());
  }


  void DoTestRoundTrip() {
    // First write the file.
    WriteTestFile();

    // Then iterate back over it, applying deltas to a fake row block.
    for (int i = 0; i < FLAGS_n_verify; i++) {
      VerifyTestFile();
    }
  }

  Status OpenDeltaFileReader(const BlockId& block_id, shared_ptr<DeltaFileReader>* out) {
    gscoped_ptr<ReadableBlock> block;
    RETURN_NOT_OK(fs_manager_->OpenBlock(block_id, &block));
    return DeltaFileReader::Open(std::move(block), block_id, out, REDO);
  }

  // TODO handle UNDO deltas
  Status OpenDeltaFileIterator(const BlockId& block_id, gscoped_ptr<DeltaIterator>* out) {
    shared_ptr<DeltaFileReader> reader;
    RETURN_NOT_OK(OpenDeltaFileReader(block_id, &reader));
    return OpenDeltaFileIteratorFromReader(REDO, reader, out);
  }

  Status OpenDeltaFileIteratorFromReader(DeltaType type,
                                         const shared_ptr<DeltaFileReader>& reader,
                                         gscoped_ptr<DeltaIterator>* out) {
    MvccSnapshot snap = type == REDO ?
                        MvccSnapshot::CreateSnapshotIncludingAllTransactions() :
                        MvccSnapshot::CreateSnapshotIncludingNoTransactions();
    DeltaIterator* raw_iter;
    RETURN_NOT_OK(reader->NewDeltaIterator(&schema_, snap, &raw_iter));
    out->reset(raw_iter);
    return Status::OK();
  }

  void VerifyTestFile() {
    shared_ptr<DeltaFileReader> reader;
    ASSERT_OK(OpenDeltaFileReader(test_block_, &reader));
    ASSERT_EQ(((FLAGS_last_row_to_update - FLAGS_first_row_to_update) / 2) + 1,
              reader->delta_stats().update_count_for_col_id(schema_.column_id(0)));
    ASSERT_EQ(0, reader->delta_stats().delete_count());
    gscoped_ptr<DeltaIterator> it;
    Status s = OpenDeltaFileIteratorFromReader(REDO, reader, &it);
    if (s.IsNotFound()) {
      FAIL() << "Iterator fell outside of the range of an include-all snapshot";
    }
    ASSERT_OK(s);
    ASSERT_OK(it->Init(nullptr));

    RowBlock block(schema_, 100, &arena_);

    // Iterate through the faked table, starting with batches that
    // come before all of the updates, and extending a bit further
    // past the updates, to ensure that nothing breaks on the boundaries.
    ASSERT_OK(it->SeekToOrdinal(0));

    int start_row = 0;
    while (start_row < FLAGS_last_row_to_update + 10000) {
      block.ZeroMemory();
      arena_.Reset();

      ASSERT_OK_FAST(it->PrepareBatch(block.nrows(), DeltaIterator::PREPARE_FOR_APPLY));
      ColumnBlock dst_col = block.column_block(0);
      ASSERT_OK_FAST(it->ApplyUpdates(0, &dst_col));

      for (int i = 0; i < block.nrows(); i++) {
        uint32_t row = start_row + i;
        bool should_be_updated = (row >= FLAGS_first_row_to_update) &&
          (row <= FLAGS_last_row_to_update) &&
          (row % 2 == 0);

        DCHECK_EQ(block.row(i).cell_ptr(0), dst_col.cell_ptr(i));
        uint32_t updated_val = *schema_.ExtractColumnFromRow<UINT32>(block.row(i), 0);
        VLOG(2) << "row " << row << ": " << updated_val;
        uint32_t expected_val = should_be_updated ? row : 0;
        // Don't use ASSERT_EQ, since it's slow (records positive results, not just negative)
        if (updated_val != expected_val) {
          FAIL() << "failed on row " << row <<
            ": expected " << expected_val << ", got " << updated_val;
        }
      }

      start_row += block.nrows();
    }
  }

 protected:
  gscoped_ptr<Env> env_;
  gscoped_ptr<FsManager> fs_manager_;
  Schema schema_;
  Arena arena_;
  BlockId test_block_;
};

TEST_F(TestDeltaFile, TestDumpDeltaFileIterator) {
  WriteTestFile();

  gscoped_ptr<DeltaIterator> it;
  Status s = OpenDeltaFileIterator(test_block_, &it);
  if (s.IsNotFound()) {
    FAIL() << "Iterator fell outside of the range of an include-all snapshot";
  }
  ASSERT_OK(s);
  vector<string> it_contents;
  ASSERT_OK(DebugDumpDeltaIterator(REDO,
                                          it.get(),
                                          schema_,
                                          ITERATE_OVER_ALL_ROWS,
                                          &it_contents));
  for (const string& str : it_contents) {
    VLOG(1) << str;
  }
  ASSERT_TRUE(is_sorted(it_contents.begin(), it_contents.end()));
  ASSERT_EQ(it_contents.size(), (FLAGS_last_row_to_update - FLAGS_first_row_to_update) / 2 + 1);
}

TEST_F(TestDeltaFile, TestWriteDeltaFileIteratorToFile) {
  WriteTestFile();
  gscoped_ptr<DeltaIterator> it;
  Status s = OpenDeltaFileIterator(test_block_, &it);
  if (s.IsNotFound()) {
    FAIL() << "Iterator fell outside of the range of an include-all snapshot";
  }
  ASSERT_OK(s);

  gscoped_ptr<WritableBlock> block;
  ASSERT_OK(fs_manager_->CreateNewBlock(&block));
  BlockId block_id(block->id());
  DeltaFileWriter dfw(std::move(block));
  ASSERT_OK(dfw.Start());
  ASSERT_OK(WriteDeltaIteratorToFile<REDO>(it.get(),
                                           ITERATE_OVER_ALL_ROWS,
                                           &dfw));
  ASSERT_OK(dfw.Finish());


  // If delta stats are incorrect, then a Status::NotFound would be
  // returned.

  ASSERT_OK(OpenDeltaFileIterator(block_id, &it));
  vector<string> it_contents;
  ASSERT_OK(DebugDumpDeltaIterator(REDO,
                                          it.get(),
                                          schema_,
                                          ITERATE_OVER_ALL_ROWS,
                                          &it_contents));
  for (const string& str : it_contents) {
    VLOG(1) << str;
  }
  ASSERT_TRUE(is_sorted(it_contents.begin(), it_contents.end()));
  ASSERT_EQ(it_contents.size(), (FLAGS_last_row_to_update - FLAGS_first_row_to_update) / 2 + 1);
}

TEST_F(TestDeltaFile, TestRoundTripTinyDeltaBlocks) {
  // Set block size small, so that we get good coverage
  // of the case where multiple delta blocks correspond to a
  // single underlying data block.
  google::FlagSaver saver;
  FLAGS_deltafile_default_block_size = 256;
  DoTestRoundTrip();
}

TEST_F(TestDeltaFile, TestRoundTrip) {
  DoTestRoundTrip();
}

TEST_F(TestDeltaFile, TestCollectMutations) {
  WriteTestFile();

  {
    gscoped_ptr<DeltaIterator> it;
    Status s = OpenDeltaFileIterator(test_block_, &it);
    if (s.IsNotFound()) {
      FAIL() << "Iterator fell outside of the range of an include-all snapshot";
    }
    ASSERT_OK(s);

    ASSERT_OK(it->Init(nullptr));
    ASSERT_OK(it->SeekToOrdinal(0));

    vector<Mutation *> mutations;
    mutations.resize(100);

    int start_row = 0;
    while (start_row < FLAGS_last_row_to_update + 10000) {
      std::fill(mutations.begin(), mutations.end(), reinterpret_cast<Mutation *>(NULL));

      arena_.Reset();
      ASSERT_OK_FAST(it->PrepareBatch(mutations.size(), DeltaIterator::PREPARE_FOR_COLLECT));
      ASSERT_OK(it->CollectMutations(&mutations, &arena_));

      for (int i = 0; i < mutations.size(); i++) {
        Mutation *mut_head = mutations[i];
        if (mut_head != nullptr) {
          rowid_t row = start_row + i;
          string str = Mutation::StringifyMutationList(schema_, mut_head);
          VLOG(1) << "Mutation on row " << row << ": " << str;
        }
      }

      start_row += mutations.size();
    }
  }

}

TEST_F(TestDeltaFile, TestSkipsDeltasOutOfRange) {
  WriteTestFile(10, 20);
  shared_ptr<DeltaFileReader> reader;
  ASSERT_OK(OpenDeltaFileReader(test_block_, &reader));

  gscoped_ptr<DeltaIterator> iter;

  // should skip
  MvccSnapshot snap1(Timestamp(9));
  ASSERT_FALSE(snap1.MayHaveCommittedTransactionsAtOrAfter(Timestamp(10)));
  DeltaIterator* raw_iter = nullptr;
  Status s = reader->NewDeltaIterator(&schema_, snap1, &raw_iter);
  ASSERT_TRUE(s.IsNotFound());
  ASSERT_TRUE(raw_iter == nullptr);

  // should include
  raw_iter = nullptr;
  MvccSnapshot snap2(Timestamp(15));
  ASSERT_OK(reader->NewDeltaIterator(&schema_, snap2, &raw_iter));
  ASSERT_TRUE(raw_iter != nullptr);
  iter.reset(raw_iter);

  // should include
  raw_iter = nullptr;
  MvccSnapshot snap3(Timestamp(21));
  ASSERT_OK(reader->NewDeltaIterator(&schema_, snap3, &raw_iter));
  ASSERT_TRUE(raw_iter != nullptr);
  iter.reset(raw_iter);
}

TEST_F(TestDeltaFile, TestLazyInit) {
  WriteTestFile();

  // Open it using a "counting" readable block.
  gscoped_ptr<ReadableBlock> block;
  ASSERT_OK(fs_manager_->OpenBlock(test_block_, &block));
  size_t bytes_read = 0;
  gscoped_ptr<ReadableBlock> count_block(
      new CountingReadableBlock(std::move(block), &bytes_read));

  // Lazily opening the delta file should not trigger any reads.
  shared_ptr<DeltaFileReader> reader;
  ASSERT_OK(DeltaFileReader::OpenNoInit(
      std::move(count_block), test_block_, &reader, REDO));
  ASSERT_EQ(0, bytes_read);

  // But initializing it should (only the first time).
  ASSERT_OK(reader->Init());
  ASSERT_GT(bytes_read, 0);
  size_t bytes_read_after_init = bytes_read;
  ASSERT_OK(reader->Init());
  ASSERT_EQ(bytes_read_after_init, bytes_read);

  // And let's test non-lazy open for good measure; it should yield the
  // same number of bytes read.
  ASSERT_OK(fs_manager_->OpenBlock(test_block_, &block));
  bytes_read = 0;
  count_block.reset(new CountingReadableBlock(std::move(block), &bytes_read));
  ASSERT_OK(DeltaFileReader::Open(std::move(count_block), test_block_, &reader, REDO));
  ASSERT_EQ(bytes_read_after_init, bytes_read);
}

} // namespace tablet
} // namespace kudu
