blob: 025acaf6d612214981be9e3c63d0aa168b904e9f [file] [log] [blame]
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <stdio.h>
#if !defined(ROCKSDB_LITE)
#if defined(LUA)
#include <string>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/lua/rocks_lua_compaction_filter.h"
#include "util/testharness.h"
namespace rocksdb {
class StopOnErrorLogger : public Logger {
public:
using Logger::Logv;
virtual void Logv(const char* format, va_list ap) override {
vfprintf(stderr, format, ap);
fprintf(stderr, "\n");
FAIL();
}
};
class RocksLuaTest : public testing::Test {
public:
RocksLuaTest() : rnd_(301) {
temp_dir_ = test::TmpDir(Env::Default());
db_ = nullptr;
}
std::string RandomString(int len) {
std::string res;
for (int i = 0; i < len; ++i) {
res += rnd_.Uniform(26) + 'a';
}
return res;
}
void CreateDBWithLuaCompactionFilter(
const lua::RocksLuaCompactionFilterOptions& lua_opt,
const std::string& db_path,
std::unordered_map<std::string, std::string>* kvs,
const int kNumFlushes = 5,
std::shared_ptr<rocksdb::lua::RocksLuaCompactionFilterFactory>*
output_factory = nullptr) {
const int kKeySize = 10;
const int kValueSize = 50;
const int kKeysPerFlush = 2;
auto factory =
std::make_shared<rocksdb::lua::RocksLuaCompactionFilterFactory>(
lua_opt);
if (output_factory != nullptr) {
*output_factory = factory;
}
options_ = Options();
options_.create_if_missing = true;
options_.compaction_filter_factory = factory;
options_.disable_auto_compactions = true;
options_.max_bytes_for_level_base =
(kKeySize + kValueSize) * kKeysPerFlush * 2;
options_.max_bytes_for_level_multiplier = 2;
options_.target_file_size_base = (kKeySize + kValueSize) * kKeysPerFlush;
options_.level0_file_num_compaction_trigger = 2;
DestroyDB(db_path, options_);
ASSERT_OK(DB::Open(options_, db_path, &db_));
for (int f = 0; f < kNumFlushes; ++f) {
for (int i = 0; i < kKeysPerFlush; ++i) {
std::string key = RandomString(kKeySize);
std::string value = RandomString(kValueSize);
kvs->insert({key, value});
ASSERT_OK(db_->Put(WriteOptions(), key, value));
}
db_->Flush(FlushOptions());
}
}
~RocksLuaTest() {
if (db_) {
delete db_;
}
}
std::string temp_dir_;
DB* db_;
Random rnd_;
Options options_;
};
TEST_F(RocksLuaTest, Default) {
// If nothing is set in the LuaCompactionFilterOptions, then
// RocksDB will keep all the key / value pairs, but it will also
// print our error log indicating failure.
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
for (auto const& entry : kvs) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
ASSERT_EQ(value, entry.second);
}
}
TEST_F(RocksLuaTest, KeepsAll) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// keeps all the key value pairs
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"KeepsAll\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
for (auto const& entry : kvs) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
ASSERT_EQ(value, entry.second);
}
}
TEST_F(RocksLuaTest, GetName) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
const std::string kScriptName = "SimpleLuaCompactionFilter";
lua_opt.lua_script =
std::string(
"function Filter(level, key, existing_value)\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"") + kScriptName + "\"\n"
"end\n"
"\n";
std::shared_ptr<CompactionFilterFactory> factory =
std::make_shared<lua::RocksLuaCompactionFilterFactory>(lua_opt);
std::string factory_name(factory->Name());
ASSERT_NE(factory_name.find(kScriptName), std::string::npos);
}
TEST_F(RocksLuaTest, RemovesAll) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// removes all the key value pairs
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" return true, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"RemovesAll\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_F(RocksLuaTest, FilterByKey) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// removes all keys whose initial is less than 'r'
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" if key:sub(1,1) < 'r' then\n"
" return true, false, \"\"\n"
" end\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"KeepsAll\"\n"
"end\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
if (entry.first[0] < 'r') {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_TRUE(s.ok());
ASSERT_TRUE(value == entry.second);
}
}
}
TEST_F(RocksLuaTest, FilterByValue) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// removes all values whose initial is less than 'r'
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" if existing_value:sub(1,1) < 'r' then\n"
" return true, false, \"\"\n"
" end\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"FilterByValue\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
if (entry.second[0] < 'r') {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_TRUE(s.ok());
ASSERT_EQ(value, entry.second);
}
}
}
TEST_F(RocksLuaTest, ChangeValue) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// Replace all values by their reversed key
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" return false, true, key:reverse()\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"ChangeValue\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
std::string new_value = entry.first;
std::reverse(new_value.begin(), new_value.end());
ASSERT_EQ(value, new_value);
}
}
TEST_F(RocksLuaTest, ConditionallyChangeAndFilterValue) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// Performs the following logic:
// If key[0] < 'h' --> replace value by reverse key
// If key[0] >= 'r' --> keep the original key value
// Otherwise, filter the key value
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" if key:sub(1,1) < 'h' then\n"
" return false, true, key:reverse()\n"
" elseif key:sub(1,1) < 'r' then\n"
" return true, false, \"\"\n"
" end\n"
" return false, false, \"\"\n"
"end\n"
"function Name()\n"
" return \"ConditionallyChangeAndFilterValue\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
if (entry.first[0] < 'h') {
ASSERT_TRUE(s.ok());
std::string new_value = entry.first;
std::reverse(new_value.begin(), new_value.end());
ASSERT_EQ(value, new_value);
} else if (entry.first[0] < 'r') {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_TRUE(s.ok());
ASSERT_EQ(value, entry.second);
}
}
}
TEST_F(RocksLuaTest, DynamicChangeScript) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// keeps all the key value pairs
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"KeepsAll\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
std::shared_ptr<rocksdb::lua::RocksLuaCompactionFilterFactory> factory;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs, 30, &factory);
uint64_t count = 0;
ASSERT_TRUE(db_->GetIntProperty(
rocksdb::DB::Properties::kNumEntriesActiveMemTable, &count));
ASSERT_EQ(count, 0);
ASSERT_TRUE(db_->GetIntProperty(
rocksdb::DB::Properties::kNumEntriesImmMemTables, &count));
ASSERT_EQ(count, 0);
CompactRangeOptions cr_opt;
cr_opt.bottommost_level_compaction =
rocksdb::BottommostLevelCompaction::kForce;
// Issue full compaction and expect everything is in the DB.
ASSERT_OK(db_->CompactRange(cr_opt, nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
ASSERT_EQ(value, entry.second);
}
// change the lua script to removes all the key value pairs
factory->SetScript(
"function Filter(level, key, existing_value)\n"
" return true, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"RemovesAll\"\n"
"end\n"
"\n");
{
std::string key = "another-key";
std::string value = "another-value";
kvs.insert({key, value});
ASSERT_OK(db_->Put(WriteOptions(), key, value));
db_->Flush(FlushOptions());
}
cr_opt.change_level = true;
cr_opt.target_level = 5;
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(cr_opt, nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_F(RocksLuaTest, LuaConditionalTypeError) {
std::string db_path = temp_dir_ + "/rocks_lua_test";
lua::RocksLuaCompactionFilterOptions lua_opt;
// Filter() error when input key's initial >= 'r'
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" if existing_value:sub(1,1) >= 'r' then\n"
" return true, 2, \"\" -- incorrect type of 2nd return value\n"
" end\n"
" return true, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"BuggyCode\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
// Create DB with 10 files
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs, 10);
// Issue full compaction and expect all keys which initial is < 'r'
// will be deleted as we keep the key value when we hit an error.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
if (entry.second[0] < 'r') {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_TRUE(s.ok());
ASSERT_EQ(value, entry.second);
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
int main(int argc, char** argv) {
printf("LUA_PATH is not set. Ignoring the test.\n");
}
#endif // defined(LUA)
#else
int main(int argc, char** argv) {
printf("Lua is not supported in RocksDBLite. Ignoring the test.\n");
}
#endif // !defined(ROCKSDB_LITE)