blob: bd79ba8338466495a197dbf98c23ca894e4d546a [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <functional>
#include <initializer_list>
#include <memory>
#include <sstream>
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/cfile/block_cache.h"
#include "kudu/cfile/block_handle.h"
#include "kudu/cfile/block_pointer.h"
#include "kudu/cfile/cfile-test-base.h"
#include "kudu/cfile/cfile.pb.h"
#include "kudu/cfile/cfile_reader.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/cfile/cfile_writer.h"
#include "kudu/cfile/index_btree.h"
#include "kudu/cfile/type_encodings.h"
#include "kudu/common/column_materialization_context.h"
#include "kudu/common/columnblock-test-util.h"
#include "kudu/common/columnblock.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/rowblock_memory.h"
#include "kudu/common/rowid.h"
#include "kudu/common/schema.h"
#include "kudu/common/types.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs-test-util.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/fs/io_context.h"
#include "kudu/gutil/casts.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/singleton.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/bitmap.h"
#include "kudu/util/cache.h"
#include "kudu/util/compression/compression.pb.h"
#include "kudu/util/env.h"
#include "kudu/util/int128.h"
#include "kudu/util/int128_util.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/nvm_cache.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
namespace kudu {
class Arena;
} // namespace kudu
DECLARE_bool(cfile_write_checksums);
DECLARE_bool(cfile_verify_checksums);
DECLARE_string(block_cache_type);
DECLARE_bool(force_block_cache_capacity);
DECLARE_int64(block_cache_capacity_mb);
DECLARE_string(nvm_cache_path);
DECLARE_bool(nvm_cache_simulate_allocation_failure);
METRIC_DECLARE_counter(block_cache_hits_caching);
METRIC_DECLARE_entity(server);
using kudu::fs::BlockManager;
using kudu::fs::CountingReadableBlock;
using kudu::fs::CreateCorruptBlock;
using kudu::fs::ReadableBlock;
using kudu::fs::WritableBlock;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
using strings::Substitute;
namespace kudu {
namespace cfile {
class TestCFile : public CFileTestBase {
protected:
template <class DataGeneratorType>
void TestReadWriteFixedSizeTypes(EncodingType encoding) {
BlockId block_id;
DataGeneratorType generator;
WriteTestFile(&generator, encoding, NO_COMPRESSION, 10000, SMALL_BLOCKSIZE, &block_id);
unique_ptr<ReadableBlock> block;
ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
unique_ptr<CFileReader> reader;
ASSERT_OK(CFileReader::Open(std::move(block), ReaderOptions(), &reader));
BlockPointer ptr;
unique_ptr<CFileIterator> iter;
ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
ASSERT_OK(iter->SeekToOrdinal(5000));
ASSERT_EQ(5000u, iter->GetCurrentOrdinal());
// Seek to last key exactly, should succeed.
ASSERT_OK(iter->SeekToOrdinal(9999));
ASSERT_EQ(9999u, iter->GetCurrentOrdinal());
// Seek to after last key. Should result in not found.
ASSERT_TRUE(iter->SeekToOrdinal(10000).IsNotFound());
// Seek to start of file
ASSERT_OK(iter->SeekToOrdinal(0));
ASSERT_EQ(0u, iter->GetCurrentOrdinal());
// Fetch all data.
ScopedColumnBlock<DataGeneratorType::kDataType> out(10000);
size_t n = 10000;
SelectionVector sel(10000);
ColumnMaterializationContext out_ctx = CreateNonDecoderEvalContext(&out, &sel);
ASSERT_OK(iter->CopyNextValues(&n, &out_ctx));
ASSERT_EQ(10000, n);
DataGeneratorType data_generator_pre;
for (int i = 0; i < 10000; i++) {
if (out[i] != data_generator_pre.BuildTestValue(0,i)) {
FAIL() << "mismatch at index " << i
<< " expected: " << data_generator_pre.BuildTestValue(0,i)
<< " got: " << out[i];
}
out[i] = 0;
}
// Fetch all data using small batches of only a few rows.
// This should catch edge conditions like a batch lining up exactly
// with the end of a block.
unsigned int seed = time(nullptr);
LOG(INFO) << "Using random seed: " << seed;
srand(seed);
ASSERT_OK(iter->SeekToOrdinal(0));
size_t fetched = 0;
while (fetched < 10000) {
ColumnBlock advancing_block(out.type_info(),
nullptr,
out.data() + (fetched * out.stride()),
out.nrows() - fetched,
out.memory());
ColumnMaterializationContext adv_ctx = CreateNonDecoderEvalContext(&advancing_block, &sel);
ASSERT_TRUE(iter->HasNext());
size_t batch_size = random() % 5 + 1;
size_t n = batch_size;
ASSERT_OK(iter->CopyNextValues(&n, &adv_ctx));
ASSERT_LE(n, batch_size);
fetched += n;
}
ASSERT_FALSE(iter->HasNext());
DataGeneratorType data_generator_post;
// Re-verify
for (int i = 0; i < 10000; i++) {
if (out[i] != data_generator_post.BuildTestValue(0,i)) {
FAIL() << "mismatch at index " << i
<< " expected: " << data_generator_post.BuildTestValue(0,i)
<< " got: " << out[i];
}
out[i] = 0;
}
TimeReadFile(fs_manager_.get(), block_id, &n);
ASSERT_EQ(10000, n);
}
template <class DataGeneratorType>
void TimeSeekAndReadFileWithNulls(DataGeneratorType* generator,
const BlockId& block_id, size_t num_entries) {
unique_ptr<ReadableBlock> block;
ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
unique_ptr<CFileReader> reader;
ASSERT_OK(CFileReader::Open(std::move(block), ReaderOptions(), &reader));
ASSERT_EQ(DataGeneratorType::kDataType, reader->type_info()->type());
unique_ptr<CFileIterator> iter;
ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
RowBlockMemory mem;
ScopedColumnBlock<DataGeneratorType::kDataType> cb(10);
SelectionVector sel(10);
ColumnMaterializationContext ctx = CreateNonDecoderEvalContext(&cb, &sel);
const int kNumLoops = AllowSlowTests() ? num_entries : 10;
for (int loop = 0; loop < kNumLoops; loop++) {
// Seek to a random point in the file,
// or just try each entry as starting point if you're running SlowTests
int target = AllowSlowTests() ? loop : (random() % (num_entries - 1));
SCOPED_TRACE(target);
ASSERT_OK(iter->SeekToOrdinal(target));
ASSERT_TRUE(iter->HasNext());
// Read and verify several ColumnBlocks from this point in the file.
int read_offset = target;
for (int block = 0; block < 3 && iter->HasNext(); block++) {
size_t n = cb.nrows();
ASSERT_OK_FAST(iter->CopyNextValues(&n, &ctx));
ASSERT_EQ(n, std::min(num_entries - read_offset, cb.nrows()));
// Verify that the block data is correct.
generator->Build(read_offset, n);
for (size_t j = 0; j < n; ++j) {
bool expected_null = generator->TestValueShouldBeNull(read_offset + j);
ASSERT_EQ(expected_null, cb.is_null(j));
if (!expected_null) {
ASSERT_EQ((*generator)[j], cb[j]);
}
}
cb.memory()->Reset();
read_offset += n;
}
}
}
template <class DataGeneratorType>
void TestNullTypes(DataGeneratorType* generator, EncodingType encoding,
CompressionType compression) {
BlockId block_id;
WriteTestFile(generator, encoding, compression, 10000, SMALL_BLOCKSIZE, &block_id);
size_t n;
NO_FATALS(TimeReadFile(fs_manager_.get(), block_id, &n));
ASSERT_EQ(n, 10000);
generator->Reset();
NO_FATALS(TimeSeekAndReadFileWithNulls(generator, block_id, n));
}
void TestReadWriteRawBlocks(CompressionType compression, int num_entries) {
// Test Write
unique_ptr<WritableBlock> sink;
ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
BlockId id = sink->id();
WriterOptions opts;
opts.write_posidx = true;
opts.write_validx = false;
opts.storage_attributes.cfile_block_size = FLAGS_cfile_test_block_size;
opts.storage_attributes.compression = compression;
opts.storage_attributes.encoding = PLAIN_ENCODING;
CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));
ASSERT_OK(w.Start());
for (uint32_t i = 0; i < num_entries; i++) {
vector<Slice> slices;
slices.emplace_back("Head");
slices.emplace_back("Body");
slices.emplace_back("Tail");
slices.emplace_back(reinterpret_cast<uint8_t *>(&i), 4);
ASSERT_OK(w.AppendRawBlock(slices, i, nullptr, Slice(), "raw-data"));
}
ASSERT_OK(w.Finish());
// Test Read
unique_ptr<ReadableBlock> source;
ASSERT_OK(fs_manager_->OpenBlock(id, &source));
unique_ptr<CFileReader> reader;
ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
ASSERT_EQ(reader->footer().compression(), compression);
if (FLAGS_cfile_write_checksums) {
ASSERT_TRUE(reader->footer().incompatible_features() & IncompatibleFeatures::CHECKSUM);
} else {
ASSERT_FALSE(reader->footer().incompatible_features() & IncompatibleFeatures::CHECKSUM);
}
unique_ptr<IndexTreeIterator> iter;
iter.reset(IndexTreeIterator::Create(nullptr, reader.get(), reader->posidx_root()));
ASSERT_OK(iter->SeekToFirst());
uint8_t data[16];
Slice expected_data(data, 16);
memcpy(data, "HeadBodyTail", 12);
uint32_t count = 0;
do {
scoped_refptr<BlockHandle> dblk_data;
BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
ASSERT_OK(reader->ReadBlock(nullptr, blk_ptr, CFileReader::CACHE_BLOCK, &dblk_data));
memcpy(data + 12, &count, 4);
ASSERT_EQ(expected_data, dblk_data->data());
count++;
} while (iter->Next().ok());
ASSERT_EQ(num_entries, count);
}
void TestReadWriteStrings(EncodingType encoding) {
TestReadWriteStrings(encoding, [](size_t val) {
return StringPrintf("hello %04zd", val);
});
}
void TestReadWriteStrings(EncodingType encoding,
std::function<string(size_t)> formatter);
#ifdef NDEBUG
void TestWrite100MFileStrings(EncodingType encoding) {
BlockId block_id;
LOG_TIMING(INFO, "writing 100M strings") {
LOG(INFO) << "Starting writefile";
StringDataGenerator<false> generator([](size_t idx) {
char buf[kFastToBufferSize];
FastHex64ToBuffer(idx, buf);
return string(buf);
});
WriteTestFile(&generator, encoding, NO_COMPRESSION, 100000000, NO_FLAGS, &block_id);
LOG(INFO) << "Done writing";
}
LOG_TIMING(INFO, "reading 100M strings") {
LOG(INFO) << "Starting readfile";
size_t n;
TimeReadFile(fs_manager_.get(), block_id, &n);
ASSERT_EQ(100000000, n);
LOG(INFO) << "End readfile";
}
}
#endif
void TestWriteDictEncodingLowCardinalityStrings(int64_t num_rows) {
BlockId block_id;
LOG_TIMING(INFO, Substitute("writing $0 strings with dupes", num_rows)) {
LOG(INFO) << "Starting writefile";
// The second parameter specify how many distinct strings are there
DuplicateStringDataGenerator<false> generator("hello %zu", 256);
WriteTestFile(&generator, DICT_ENCODING, NO_COMPRESSION, num_rows, NO_FLAGS, &block_id);
LOG(INFO) << "Done writing";
}
LOG_TIMING(INFO, Substitute("reading $0 strings with dupes", num_rows)) {
LOG(INFO) << "Starting readfile";
size_t n;
TimeReadFile(fs_manager_.get(), block_id, &n);
ASSERT_EQ(num_rows, n);
LOG(INFO) << "End readfile";
}
}
Status CorruptAndReadBlock(const BlockId block_id, const uint64_t corrupt_offset,
uint8_t flip_bit) {
BlockId new_id;
RETURN_NOT_OK(CreateCorruptBlock(
fs_manager_.get(), block_id, corrupt_offset, flip_bit, &new_id));
// Open and read the corrupt block with the CFileReader
unique_ptr<ReadableBlock> corrupt_source;
RETURN_NOT_OK(fs_manager_->OpenBlock(new_id, &corrupt_source));
unique_ptr<CFileReader> reader;
ReaderOptions opts;
const fs::IOContext io_context({ "corrupted-dummy-tablet" });
opts.io_context = &io_context;
RETURN_NOT_OK(CFileReader::Open(std::move(corrupt_source), std::move(opts), &reader));
unique_ptr<IndexTreeIterator> iter;
iter.reset(IndexTreeIterator::Create(&io_context, reader.get(), reader->posidx_root()));
RETURN_NOT_OK(iter->SeekToFirst());
do {
scoped_refptr<BlockHandle> dblk_data;
BlockPointer blk_ptr = iter->GetCurrentBlockPointer();
RETURN_NOT_OK(reader->ReadBlock(&io_context, blk_ptr,
CFileReader::DONT_CACHE_BLOCK, &dblk_data));
} while (iter->Next().ok());
return Status::OK();
}
};
// Subclass of TestCFile which is parameterized on the block cache type.
// Tests that use TEST_P(TestCFileBothCacheMemoryTypes, ...) will run twice --
// once for each cache memory type (DRAM, NVM).
class TestCFileBothCacheMemoryTypes :
public TestCFile,
public ::testing::WithParamInterface<Cache::MemoryType> {
public:
void SetUp() OVERRIDE {
// The NVM cache can run using any directory as its path -- it doesn't have
// a lot of practical use outside of an actual NVM device, but for testing
// purposes, we'll point it at our test dir, unless otherwise specified.
if (google::GetCommandLineFlagInfoOrDie("nvm_cache_path").is_default) {
FLAGS_nvm_cache_path = GetTestPath("nvm-cache");
ASSERT_OK(Env::Default()->CreateDir(FLAGS_nvm_cache_path));
}
switch (GetParam()) {
case Cache::MemoryType::DRAM:
FLAGS_block_cache_type = "DRAM";
break;
case Cache::MemoryType::NVM:
FLAGS_block_cache_type = "NVM";
break;
default:
LOG(FATAL) << "Unknown block cache type: '"
<< static_cast<int16_t>(GetParam());
}
CFileTestBase::SetUp();
}
void TearDown() OVERRIDE {
Singleton<BlockCache>::UnsafeReset();
}
};
INSTANTIATE_TEST_SUITE_P(CacheMemoryTypes, TestCFileBothCacheMemoryTypes,
::testing::Values(Cache::MemoryType::DRAM,
Cache::MemoryType::NVM));
template <DataType type>
void CopyOne(CFileIterator* it, typename TypeTraits<type>::cpp_type* ret, RowBlockMemory* mem) {
ColumnBlock cb(GetTypeInfo(type), nullptr, ret, 1, mem);
SelectionVector sel(1);
ColumnMaterializationContext ctx(0, nullptr, &cb, &sel);
ctx.SetDecoderEvalNotSupported();
size_t n = 1;
ASSERT_OK(it->CopyNextValues(&n, &ctx));
ASSERT_EQ(1, n);
}
#ifdef NDEBUG
// Only run the 100M entry tests in non-debug mode.
// They take way too long with debugging enabled.
TEST_P(TestCFileBothCacheMemoryTypes, TestWrite100MFileInts) {
RETURN_IF_NO_NVM_CACHE(GetParam());
BlockId block_id;
LOG_TIMING(INFO, "writing 100m ints") {
LOG(INFO) << "Starting writefile";
Int32DataGenerator<false> generator;
WriteTestFile(&generator, BIT_SHUFFLE, NO_COMPRESSION, 100000000, NO_FLAGS, &block_id);
LOG(INFO) << "Done writing";
}
LOG_TIMING(INFO, "reading 100M ints") {
LOG(INFO) << "Starting readfile";
size_t n;
TimeReadFile(fs_manager_.get(), block_id, &n);
ASSERT_EQ(100000000, n);
LOG(INFO) << "End readfile";
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestWrite100MFileNullableInts) {
RETURN_IF_NO_NVM_CACHE(GetParam());
BlockId block_id;
LOG_TIMING(INFO, "writing 100m nullable ints") {
LOG(INFO) << "Starting writefile";
Int32DataGenerator<true> generator;
WriteTestFile(&generator, PLAIN_ENCODING, NO_COMPRESSION, 100000000, NO_FLAGS, &block_id);
LOG(INFO) << "Done writing";
}
LOG_TIMING(INFO, "reading 100M nullable ints") {
LOG(INFO) << "Starting readfile";
size_t n;
TimeReadFile(fs_manager_.get(), block_id, &n);
ASSERT_EQ(100000000, n);
LOG(INFO) << "End readfile";
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestWrite100MFileStringsPrefixEncoding) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestWrite100MFileStrings(PREFIX_ENCODING);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestWrite100MUniqueStringsDictEncoding) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestWrite100MFileStrings(DICT_ENCODING);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestWrite100MLowCardinalityStringsDictEncoding) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestWriteDictEncodingLowCardinalityStrings(100 * 1e6);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestWrite100MFileStringsPlainEncoding) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestWrite100MFileStrings(PLAIN_ENCODING);
}
#endif
// Write and Read 1 million unique strings with dictionary encoding
TEST_P(TestCFileBothCacheMemoryTypes, TestWrite1MUniqueFileStringsDictEncoding) {
RETURN_IF_NO_NVM_CACHE(GetParam());
BlockId block_id;
LOG_TIMING(INFO, "writing 1M unique strings") {
LOG(INFO) << "Starting writefile";
StringDataGenerator<false> generator("hello %zu");
WriteTestFile(&generator, DICT_ENCODING, NO_COMPRESSION, 1000000, NO_FLAGS, &block_id);
LOG(INFO) << "Done writing";
}
LOG_TIMING(INFO, "reading 1M strings") {
LOG(INFO) << "Starting readfile";
size_t n;
TimeReadFile(fs_manager_.get(), block_id, &n);
ASSERT_EQ(1000000, n);
LOG(INFO) << "End readfile";
}
}
// Write and Read 1 million strings, which contains duplicates with dictionary encoding
TEST_P(TestCFileBothCacheMemoryTypes, TestWrite1MLowCardinalityStringsDictEncoding) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestWriteDictEncodingLowCardinalityStrings(1000000);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestReadWriteUInt32) {
RETURN_IF_NO_NVM_CACHE(GetParam());
for (auto enc : { PLAIN_ENCODING, RLE }) {
TestReadWriteFixedSizeTypes<UInt32DataGenerator<false>>(enc);
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestReadWriteInt32) {
RETURN_IF_NO_NVM_CACHE(GetParam());
for (auto enc : { PLAIN_ENCODING, RLE }) {
TestReadWriteFixedSizeTypes<Int32DataGenerator<false>>(enc);
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestReadWriteUInt64) {
RETURN_IF_NO_NVM_CACHE(GetParam());
for (auto enc : { PLAIN_ENCODING, RLE, BIT_SHUFFLE }) {
TestReadWriteFixedSizeTypes<UInt64DataGenerator<false>>(enc);
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestReadWriteInt64) {
RETURN_IF_NO_NVM_CACHE(GetParam());
for (auto enc : { PLAIN_ENCODING, RLE, BIT_SHUFFLE }) {
TestReadWriteFixedSizeTypes<Int64DataGenerator<false>>(enc);
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestReadWriteInt128) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestReadWriteFixedSizeTypes<Int128DataGenerator<false>>(PLAIN_ENCODING);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestFixedSizeReadWritePlainEncodingFloat) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestReadWriteFixedSizeTypes<FPDataGenerator<FLOAT, false>>(PLAIN_ENCODING);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestFixedSizeReadWritePlainEncodingDouble) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestReadWriteFixedSizeTypes<FPDataGenerator<DOUBLE, false>>(PLAIN_ENCODING);
}
// Test for BitShuffle builder for UINT8, INT8, UINT16, INT16, UINT32, INT32,
// UINT64, INT64, INT128, FLOAT, DOUBLE
template <typename T>
class BitShuffleTest : public TestCFile {
public:
void TestBitShuffle() {
TestReadWriteFixedSizeTypes<T>(BIT_SHUFFLE);
}
};
typedef ::testing::Types<UInt8DataGenerator<false>,
Int8DataGenerator<false>,
UInt16DataGenerator<false>,
Int16DataGenerator<false>,
UInt32DataGenerator<false>,
Int32DataGenerator<false>,
UInt64DataGenerator<false>,
Int64DataGenerator<false>,
Int128DataGenerator<false>,
FPDataGenerator<FLOAT, false>,
FPDataGenerator<DOUBLE, false> > MyTypes;
TYPED_TEST_SUITE(BitShuffleTest, MyTypes);
TYPED_TEST(BitShuffleTest, TestFixedSizeReadWriteBitShuffle) {
this->TestBitShuffle();
}
void EncodeStringKey(const Schema& schema,
const Slice& key,
Arena* arena,
EncodedKey** encoded_key) {
EncodedKeyBuilder kb(&schema, arena);
kb.AddColumnKey(&key);
*encoded_key = kb.BuildEncodedKey();
}
void TestCFile::TestReadWriteStrings(EncodingType encoding,
std::function<string(size_t)> formatter) {
Schema schema({ ColumnSchema("key", STRING) }, 1);
const int nrows = 10000;
BlockId block_id;
StringDataGenerator<false> generator(formatter);
WriteTestFile(&generator, encoding, NO_COMPRESSION, nrows,
SMALL_BLOCKSIZE | WRITE_VALIDX, &block_id);
unique_ptr<ReadableBlock> block;
ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
unique_ptr<CFileReader> reader;
ASSERT_OK(CFileReader::Open(std::move(block), ReaderOptions(), &reader));
rowid_t reader_nrows;
ASSERT_OK(reader->CountRows(&reader_nrows));
ASSERT_EQ(nrows, reader_nrows);
BlockPointer ptr;
unique_ptr<CFileIterator> iter;
ASSERT_OK(reader->NewIterator(&iter, CFileReader::CACHE_BLOCK, nullptr));
RowBlockMemory mem;
ASSERT_OK(iter->SeekToOrdinal(5000));
ASSERT_EQ(5000, iter->GetCurrentOrdinal());
Slice s;
CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(5000), s.ToString());
// Seek to last key exactly, should succeed
ASSERT_OK(iter->SeekToOrdinal(9999));
ASSERT_EQ(9999, iter->GetCurrentOrdinal());
// Seek to after last key. Should result in not found.
ASSERT_TRUE(iter->SeekToOrdinal(10000).IsNotFound());
////////
// Now try some seeks by the value instead of position
/////////
EncodedKey* encoded_key = nullptr;
bool exact;
// Seek in between each key.
// (seek to "hello 0000.5" through "hello 9999.5")
string buf;
for (int i = 1; i < 10000; i++) {
mem.Reset();
buf = formatter(i - 1);
buf.append(".5");
s = Slice(buf);
EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
ASSERT_FALSE(exact);
ASSERT_EQ(i, iter->GetCurrentOrdinal());
CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(i), s.ToString());
}
// Seek exactly to each key
// (seek to "hello 0000" through "hello 9999")
for (int i = 0; i < 9999; i++) {
mem.Reset();
buf = formatter(i);
s = Slice(buf);
EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
ASSERT_TRUE(exact);
ASSERT_EQ(i, iter->GetCurrentOrdinal());
Slice read_back;
CopyOne<STRING>(iter.get(), &read_back, &mem);
ASSERT_EQ(read_back.ToString(), s.ToString());
}
// after last entry
// (seek to "hello 9999.x")
buf = formatter(9999) + ".x";
s = Slice(buf);
EncodeStringKey(schema, s, &mem.arena, &encoded_key);
EXPECT_TRUE(iter->SeekAtOrAfter(*encoded_key, &exact).IsNotFound());
// before first entry
// (seek to "hello 000", which falls before "hello 0000")
buf = formatter(0);
buf.resize(buf.size() - 1);
s = Slice(buf);
EncodeStringKey(schema, s, &mem.arena, &encoded_key);
ASSERT_OK(iter->SeekAtOrAfter(*encoded_key, &exact));
EXPECT_FALSE(exact);
EXPECT_EQ(0, iter->GetCurrentOrdinal());
CopyOne<STRING>(iter.get(), &s, &mem);
EXPECT_EQ(formatter(0), s.ToString());
// Seek to start of file by ordinal
ASSERT_OK(iter->SeekToFirst());
ASSERT_EQ(0, iter->GetCurrentOrdinal());
CopyOne<STRING>(iter.get(), &s, &mem);
ASSERT_EQ(formatter(0), s.ToString());
// Reseek to start and fetch all data.
// We fetch in 10 smaller chunks to avoid using too much RAM for the
// case where the values are large.
SelectionVector sel(10000);
ASSERT_OK(iter->SeekToFirst());
for (int i = 0; i < 10; i++) {
ScopedColumnBlock<STRING> cb(10000);
ColumnMaterializationContext cb_ctx = CreateNonDecoderEvalContext(&cb, &sel);
size_t n = 1000;
ASSERT_OK(iter->CopyNextValues(&n, &cb_ctx));
ASSERT_EQ(1000, n);
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestReadWriteStringsPrefixEncoding) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestReadWriteStrings(PREFIX_ENCODING);
}
// Read/Write test for dictionary encoded blocks
TEST_P(TestCFileBothCacheMemoryTypes, TestReadWriteStringsDictEncoding) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestReadWriteStrings(DICT_ENCODING);
}
// Regression test for properly handling cells that are larger
// than the index block and/or data block size.
//
// This test is disabled in TSAN because it's single-threaded anyway
// and runs extremely slowly with TSAN enabled.
#ifndef THREAD_SANITIZER
TEST_P(TestCFileBothCacheMemoryTypes, TestReadWriteLargeStrings) {
RETURN_IF_NO_NVM_CACHE(GetParam());
// Pad the values out to a length of ~65KB.
// We use this method instead of just a longer sprintf format since
// this is much more CPU-efficient (speeds up the test).
auto formatter = [](size_t val) {
string ret(66000, '0');
StringAppendF(&ret, "%010zd", val);
return ret;
};
TestReadWriteStrings(PLAIN_ENCODING, formatter);
if (AllowSlowTests()) {
TestReadWriteStrings(DICT_ENCODING, formatter);
TestReadWriteStrings(PREFIX_ENCODING, formatter);
}
}
#endif
// Test that metadata entries stored in the cfile are persisted.
TEST_P(TestCFileBothCacheMemoryTypes, TestMetadata) {
RETURN_IF_NO_NVM_CACHE(GetParam());
BlockId block_id;
// Write the file.
{
unique_ptr<WritableBlock> sink;
ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
block_id = sink->id();
WriterOptions opts;
CFileWriter w(opts, GetTypeInfo(INT32), false, std::move(sink));
w.AddMetadataPair("key_in_header", "header value");
ASSERT_OK(w.Start());
uint32_t val = 1;
ASSERT_OK(w.AppendEntries(&val, 1));
w.AddMetadataPair("key_in_footer", "footer value");
ASSERT_OK(w.Finish());
}
// Read the file and ensure metadata is present.
{
unique_ptr<ReadableBlock> source;
ASSERT_OK(fs_manager_->OpenBlock(block_id, &source));
unique_ptr<CFileReader> reader;
ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
string val;
ASSERT_TRUE(reader->GetMetadataEntry("key_in_header", &val));
ASSERT_EQ(val, "header value");
ASSERT_TRUE(reader->GetMetadataEntry("key_in_footer", &val));
ASSERT_EQ(val, "footer value");
ASSERT_FALSE(reader->GetMetadataEntry("not a key", &val));
// Test that, even though we didn't specify an encoding or compression, the
// resulting file has them explicitly set.
ASSERT_EQ(BIT_SHUFFLE, reader->type_encoding_info()->encoding_type());
ASSERT_EQ(NO_COMPRESSION, reader->footer().compression());
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestDefaultColumnIter) {
RETURN_IF_NO_NVM_CACHE(GetParam());
const int kNumItems = 64;
uint8_t non_null_bitmap[BitmapSize(kNumItems)];
uint32_t data[kNumItems];
// Test Int Default Value
uint32_t int_value = 15;
DefaultColumnValueIterator iter(GetTypeInfo(UINT32), &int_value);
ColumnBlock int_col(GetTypeInfo(UINT32), nullptr, data, kNumItems, nullptr);
SelectionVector sel(kNumItems);
ColumnMaterializationContext int_ctx = CreateNonDecoderEvalContext(&int_col, &sel);
ASSERT_OK(iter.Scan(&int_ctx));
for (size_t i = 0; i < int_col.nrows(); ++i) {
ASSERT_EQ(int_value, *reinterpret_cast<const uint32_t *>(int_col.cell_ptr(i)));
}
// Test Int Nullable Default Value
int_value = 321;
DefaultColumnValueIterator nullable_iter(GetTypeInfo(UINT32), &int_value);
ColumnBlock nullable_col(GetTypeInfo(UINT32), non_null_bitmap, data, kNumItems, nullptr);
ColumnMaterializationContext nullable_ctx = CreateNonDecoderEvalContext(&nullable_col, &sel);
ASSERT_OK(nullable_iter.Scan(&nullable_ctx));
for (size_t i = 0; i < nullable_col.nrows(); ++i) {
ASSERT_FALSE(nullable_col.is_null(i));
ASSERT_EQ(int_value, *reinterpret_cast<const uint32_t *>(nullable_col.cell_ptr(i)));
}
// Test NULL Default Value
DefaultColumnValueIterator null_iter(GetTypeInfo(UINT32), nullptr);
ColumnBlock null_col(GetTypeInfo(UINT32), non_null_bitmap, data, kNumItems, nullptr);
ColumnMaterializationContext null_ctx = CreateNonDecoderEvalContext(&null_col, &sel);
ASSERT_OK(null_iter.Scan(&null_ctx));
for (size_t i = 0; i < null_col.nrows(); ++i) {
ASSERT_TRUE(null_col.is_null(i));
}
// Test String Default Value
Slice str_data[kNumItems];
Slice str_value("Hello");
RowBlockMemory mem;
DefaultColumnValueIterator str_iter(GetTypeInfo(STRING), &str_value);
ColumnBlock str_col(GetTypeInfo(STRING), nullptr, str_data, kNumItems, &mem);
ColumnMaterializationContext str_ctx = CreateNonDecoderEvalContext(&str_col, &sel);
ASSERT_OK(str_iter.Scan(&str_ctx));
for (size_t i = 0; i < str_col.nrows(); ++i) {
ASSERT_EQ(str_value, *reinterpret_cast<const Slice *>(str_col.cell_ptr(i)));
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestAppendRaw) {
RETURN_IF_NO_NVM_CACHE(GetParam());
TestReadWriteRawBlocks(NO_COMPRESSION, 1000);
TestReadWriteRawBlocks(SNAPPY, 1000);
TestReadWriteRawBlocks(LZ4, 1000);
TestReadWriteRawBlocks(ZLIB, 1000);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestChecksumFlags) {
RETURN_IF_NO_NVM_CACHE(GetParam());
for (bool write_checksums : {false, true}) {
for (bool verify_checksums : {false, true}) {
FLAGS_cfile_write_checksums = write_checksums;
FLAGS_cfile_verify_checksums = verify_checksums;
TestReadWriteRawBlocks(NO_COMPRESSION, 1000);
TestReadWriteRawBlocks(SNAPPY, 1000);
}
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestDataCorruption) {
RETURN_IF_NO_NVM_CACHE(GetParam());
FLAGS_cfile_write_checksums = true;
FLAGS_cfile_verify_checksums = true;
// Write some data
unique_ptr<WritableBlock> sink;
ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
BlockId id = sink->id();
WriterOptions opts;
opts.write_posidx = true;
opts.write_validx = false;
opts.storage_attributes.cfile_block_size = FLAGS_cfile_test_block_size;
opts.storage_attributes.encoding = PLAIN_ENCODING;
CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));
w.AddMetadataPair("header_key", "header_value");
ASSERT_OK(w.Start());
vector<Slice> slices;
slices.emplace_back("HelloWorld");
ASSERT_OK(w.AppendRawBlock(slices, 1, nullptr, Slice(), "raw-data"));
ASSERT_OK(w.Finish());
// Get the final size of the data
unique_ptr<ReadableBlock> source;
ASSERT_OK(fs_manager_->OpenBlock(id, &source));
uint64_t file_size;
ASSERT_OK(source->Size(&file_size));
// Corrupt each bit and verify a corruption status is returned
for (size_t i = 0; i < file_size; i++) {
for (uint8_t flip = 0; flip < 8; flip++) {
Status s = CorruptAndReadBlock(id, i, flip);
ASSERT_TRUE(s.IsCorruption()) << s.ToString();
ASSERT_STR_MATCHES(s.ToString(), "block [0-9]+");
}
}
}
TEST_P(TestCFileBothCacheMemoryTypes, TestNullInts) {
RETURN_IF_NO_NVM_CACHE(GetParam());
UInt32DataGenerator<true> generator;
TestNullTypes(&generator, PLAIN_ENCODING, NO_COMPRESSION);
TestNullTypes(&generator, PLAIN_ENCODING, LZ4);
TestNullTypes(&generator, BIT_SHUFFLE, NO_COMPRESSION);
TestNullTypes(&generator, BIT_SHUFFLE, LZ4);
TestNullTypes(&generator, RLE, NO_COMPRESSION);
TestNullTypes(&generator, RLE, LZ4);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestNullFloats) {
RETURN_IF_NO_NVM_CACHE(GetParam());
FPDataGenerator<FLOAT, true> generator;
TestNullTypes(&generator, PLAIN_ENCODING, NO_COMPRESSION);
TestNullTypes(&generator, BIT_SHUFFLE, NO_COMPRESSION);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestNullPrefixStrings) {
RETURN_IF_NO_NVM_CACHE(GetParam());
StringDataGenerator<true> generator("hello %zu");
TestNullTypes(&generator, PLAIN_ENCODING, NO_COMPRESSION);
TestNullTypes(&generator, PLAIN_ENCODING, LZ4);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestNullPlainStrings) {
RETURN_IF_NO_NVM_CACHE(GetParam());
StringDataGenerator<true> generator("hello %zu");
TestNullTypes(&generator, PREFIX_ENCODING, NO_COMPRESSION);
TestNullTypes(&generator, PREFIX_ENCODING, LZ4);
}
// Test for dictionary encoding
TEST_P(TestCFileBothCacheMemoryTypes, TestNullDictStrings) {
RETURN_IF_NO_NVM_CACHE(GetParam());
StringDataGenerator<true> generator("hello %zu");
TestNullTypes(&generator, DICT_ENCODING, NO_COMPRESSION);
TestNullTypes(&generator, DICT_ENCODING, LZ4);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestReleaseBlock) {
RETURN_IF_NO_NVM_CACHE(GetParam());
unique_ptr<WritableBlock> sink;
ASSERT_OK(fs_manager_->CreateNewBlock({}, &sink));
ASSERT_EQ(WritableBlock::CLEAN, sink->state());
WriterOptions opts;
CFileWriter w(opts, GetTypeInfo(STRING), false, std::move(sink));
ASSERT_OK(w.Start());
BlockManager* bm = fs_manager_->block_manager();
unique_ptr<fs::BlockCreationTransaction> transaction = bm->NewCreationTransaction();
ASSERT_OK(w.FinishAndReleaseBlock(transaction.get()));
ASSERT_OK(transaction->CommitCreatedBlocks());
}
TEST_P(TestCFileBothCacheMemoryTypes, TestLazyInit) {
RETURN_IF_NO_NVM_CACHE(GetParam());
// Create a small test file.
BlockId block_id;
{
const int nrows = 1000;
StringDataGenerator<false> generator("hello %04d");
WriteTestFile(&generator, PREFIX_ENCODING, NO_COMPRESSION, nrows,
SMALL_BLOCKSIZE | WRITE_VALIDX, &block_id);
}
shared_ptr<MemTracker> tracker = MemTracker::CreateTracker(-1, "test");
int64_t initial_mem_usage = tracker->consumption();
// Open it using a "counting" readable block.
unique_ptr<ReadableBlock> block;
ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
size_t bytes_read = 0;
unique_ptr<ReadableBlock> count_block(
new CountingReadableBlock(std::move(block), &bytes_read));
ASSERT_EQ(initial_mem_usage, tracker->consumption());
// Lazily opening the cfile should not trigger any reads.
ReaderOptions opts;
opts.parent_mem_tracker = tracker;
unique_ptr<CFileReader> reader;
ASSERT_OK(CFileReader::OpenNoInit(std::move(count_block), opts, &reader));
ASSERT_EQ(0, bytes_read);
int64_t lazy_mem_usage = tracker->consumption();
ASSERT_GT(lazy_mem_usage, initial_mem_usage);
// But initializing it should (only the first time), and the reader's
// memory usage should increase.
ASSERT_OK(reader->Init(nullptr));
ASSERT_GT(bytes_read, 0);
size_t bytes_read_after_init = bytes_read;
ASSERT_OK(reader->Init(nullptr));
ASSERT_EQ(bytes_read_after_init, bytes_read);
ASSERT_GT(tracker->consumption(), lazy_mem_usage);
// And let's test non-lazy open for good measure; it should yield the
// same number of bytes read.
ASSERT_OK(fs_manager_->OpenBlock(block_id, &block));
bytes_read = 0;
count_block.reset(new CountingReadableBlock(std::move(block), &bytes_read));
ASSERT_OK(CFileReader::Open(std::move(count_block), ReaderOptions(), &reader));
ASSERT_EQ(bytes_read_after_init, bytes_read);
}
// Tests that the block cache keys used by CFileReaders are stable. That is,
// different reader instances operating on the same block should use the same
// block cache keys.
TEST_P(TestCFileBothCacheMemoryTypes, TestCacheKeysAreStable) {
RETURN_IF_NO_NVM_CACHE(GetParam());
// Set up block cache instrumentation.
MetricRegistry registry;
scoped_refptr<MetricEntity> entity(METRIC_ENTITY_server.Instantiate(&registry, "test_entity"));
BlockCache* cache = BlockCache::GetSingleton();
cache->StartInstrumentation(entity);
// Create a small test file.
BlockId block_id;
{
const int nrows = 1000;
StringDataGenerator<false> generator("hello %04d");
WriteTestFile(&generator, PREFIX_ENCODING, NO_COMPRESSION, nrows,
SMALL_BLOCKSIZE | WRITE_VALIDX, &block_id);
}
// Open and read from it twice, checking the block cache statistics.
for (int i = 0; i < 2; i++) {
unique_ptr<ReadableBlock> source;
ASSERT_OK(fs_manager_->OpenBlock(block_id, &source));
unique_ptr<CFileReader> reader;
ASSERT_OK(CFileReader::Open(std::move(source), ReaderOptions(), &reader));
unique_ptr<IndexTreeIterator> iter;
iter.reset(IndexTreeIterator::Create(nullptr, reader.get(), reader->posidx_root()));
ASSERT_OK(iter->SeekToFirst());
scoped_refptr<BlockHandle> bh;
ASSERT_OK(reader->ReadBlock(nullptr, iter->GetCurrentBlockPointer(),
CFileReader::CACHE_BLOCK,
&bh));
// The first time through, we miss in the seek and in the ReadBlock().
// But the second time through, both are hits, because we've got the same
// cache keys as before.
ASSERT_EQ(i * 2, down_cast<Counter*>(
entity->FindOrNull(METRIC_block_cache_hits_caching).get())->value());
}
}
// Inject failures in nvm allocation and ensure that we can still read a file.
TEST_P(TestCFileBothCacheMemoryTypes, TestNvmAllocationFailure) {
if (GetParam() != Cache::MemoryType::NVM) return;
RETURN_IF_NO_NVM_CACHE(GetParam());
FLAGS_nvm_cache_simulate_allocation_failure = true;
TestReadWriteFixedSizeTypes<UInt32DataGenerator<false> >(PLAIN_ENCODING);
}
TEST_P(TestCFileBothCacheMemoryTypes, TestValidateBlockCacheCapacity) {
RETURN_IF_NO_NVM_CACHE(GetParam());
FLAGS_force_block_cache_capacity = false;
FLAGS_block_cache_capacity_mb = 100000000; // very big number (100TB)
switch (GetParam()) {
case Cache::MemoryType::DRAM:
ASSERT_FALSE(kudu::cfile::ValidateBlockCacheCapacity());
break;
case Cache::MemoryType::NVM:
ASSERT_TRUE(kudu::cfile::ValidateBlockCacheCapacity());
break;
default:
LOG(FATAL) << "Unknown block cache type: "
<< static_cast<int16_t>(GetParam());
}
}
class TestCFileDifferentCodecs : public TestCFile,
public testing::WithParamInterface<CompressionType> {
};
INSTANTIATE_TEST_SUITE_P(Codecs, TestCFileDifferentCodecs,
::testing::Values(NO_COMPRESSION, SNAPPY, LZ4, ZLIB));
// Read/write a file with uncompressible data (random int32s)
TEST_P(TestCFileDifferentCodecs, TestUncompressible) {
auto codec = GetParam();
const size_t nrows = 1000000;
BlockId block_id;
size_t rdrows;
// Generate a plain-encoded file with random (uncompressible) data.
// This exercises the code path which short-circuits compression
// when the codec is not able to be effective on the input data.
{
RandomInt32DataGenerator int_gen;
WriteTestFile(&int_gen, PLAIN_ENCODING, codec, nrows,
NO_FLAGS, &block_id);
TimeReadFile(fs_manager_.get(), block_id, &rdrows);
ASSERT_EQ(nrows, rdrows);
}
}
} // namespace cfile
} // namespace kudu