blob: 0934ca9c92edbee3e163174b48d2273c642623d4 [file] [log] [blame]
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#if defined(LUA) && !defined(ROCKSDB_LITE)
#include "rocksdb/utilities/lua/rocks_lua_compaction_filter.h"
extern "C" {
#include <luaconf.h>
}
#include "rocksdb/compaction_filter.h"
namespace rocksdb {
namespace lua {
const std::string kFilterFunctionName = "Filter";
const std::string kNameFunctionName = "Name";
void RocksLuaCompactionFilter::LogLuaError(const char* format, ...) const {
if (options_.error_log.get() != nullptr &&
error_count_ < options_.error_limit_per_filter) {
error_count_++;
va_list ap;
va_start(ap, format);
options_.error_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
va_end(ap);
}
}
bool RocksLuaCompactionFilter::Filter(int level, const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const {
auto* lua_state = lua_state_wrapper_.GetLuaState();
// push the right function into the lua stack
lua_getglobal(lua_state, kFilterFunctionName.c_str());
int error_no = 0;
int num_input_values;
int num_return_values;
if (options_.ignore_value == false) {
// push input arguments into the lua stack
lua_pushnumber(lua_state, level);
lua_pushlstring(lua_state, key.data(), key.size());
lua_pushlstring(lua_state, existing_value.data(), existing_value.size());
num_input_values = 3;
num_return_values = 3;
} else {
// If ignore_value is set to true, then we only put two arguments
// and expect one return value
lua_pushnumber(lua_state, level);
lua_pushlstring(lua_state, key.data(), key.size());
num_input_values = 2;
num_return_values = 1;
}
// perform the lua call
if ((error_no =
lua_pcall(lua_state, num_input_values, num_return_values, 0)) != 0) {
LogLuaError("[Lua] Error(%d) in Filter function --- %s", error_no,
lua_tostring(lua_state, -1));
// pops out the lua error from stack
lua_pop(lua_state, 1);
return false;
}
// As lua_pcall went successfully, it can be guaranteed that the top
// three elements in the Lua stack are the three returned values.
bool has_error = false;
const int kIndexIsFiltered = -num_return_values;
const int kIndexValueChanged = -num_return_values + 1;
const int kIndexNewValue = -num_return_values + 2;
// check the types of three return values
// is_filtered
if (!lua_isboolean(lua_state, kIndexIsFiltered)) {
LogLuaError(
"[Lua] Error in Filter function -- "
"1st return value (is_filtered) is not a boolean "
"while a boolean is expected.");
has_error = true;
}
if (options_.ignore_value == false) {
// value_changed
if (!lua_isboolean(lua_state, kIndexValueChanged)) {
LogLuaError(
"[Lua] Error in Filter function -- "
"2nd return value (value_changed) is not a boolean "
"while a boolean is expected.");
has_error = true;
}
// new_value
if (!lua_isstring(lua_state, kIndexNewValue)) {
LogLuaError(
"[Lua] Error in Filter function -- "
"3rd return value (new_value) is not a string "
"while a string is expected.");
has_error = true;
}
}
if (has_error) {
lua_pop(lua_state, num_return_values);
return false;
}
// Fetch the return values
bool is_filtered = false;
if (!has_error) {
is_filtered = lua_toboolean(lua_state, kIndexIsFiltered);
if (options_.ignore_value == false) {
*value_changed = lua_toboolean(lua_state, kIndexValueChanged);
if (*value_changed) {
const char* new_value_buf = lua_tostring(lua_state, kIndexNewValue);
const size_t new_value_size = lua_strlen(lua_state, kIndexNewValue);
// Note that any string that lua_tostring returns always has a zero at
// its end, bu/t it can have other zeros inside it
assert(new_value_buf[new_value_size] == '\0');
assert(strlen(new_value_buf) <= new_value_size);
new_value->assign(new_value_buf, new_value_size);
}
} else {
*value_changed = false;
}
}
// pops the three return values.
lua_pop(lua_state, num_return_values);
return is_filtered;
}
const char* RocksLuaCompactionFilter::Name() const {
if (name_ != "") {
return name_.c_str();
}
auto* lua_state = lua_state_wrapper_.GetLuaState();
// push the right function into the lua stack
lua_getglobal(lua_state, kNameFunctionName.c_str());
// perform the call (0 arguments, 1 result)
int error_no;
if ((error_no = lua_pcall(lua_state, 0, 1, 0)) != 0) {
LogLuaError("[Lua] Error(%d) in Name function --- %s", error_no,
lua_tostring(lua_state, -1));
// pops out the lua error from stack
lua_pop(lua_state, 1);
return name_.c_str();
}
// check the return value
if (!lua_isstring(lua_state, -1)) {
LogLuaError(
"[Lua] Error in Name function -- "
"return value is not a string while string is expected");
} else {
const char* name_buf = lua_tostring(lua_state, -1);
const size_t name_size __attribute__((unused)) = lua_strlen(lua_state, -1);
assert(name_buf[name_size] == '\0');
assert(strlen(name_buf) <= name_size);
name_ = name_buf;
}
lua_pop(lua_state, 1);
return name_.c_str();
}
/* Not yet supported
bool RocksLuaCompactionFilter::FilterMergeOperand(
int level, const Slice& key, const Slice& operand) const {
auto* lua_state = lua_state_wrapper_.GetLuaState();
// push the right function into the lua stack
lua_getglobal(lua_state, "FilterMergeOperand");
// push input arguments into the lua stack
lua_pushnumber(lua_state, level);
lua_pushlstring(lua_state, key.data(), key.size());
lua_pushlstring(lua_state, operand.data(), operand.size());
// perform the call (3 arguments, 1 result)
int error_no;
if ((error_no = lua_pcall(lua_state, 3, 1, 0)) != 0) {
LogLuaError("[Lua] Error(%d) in FilterMergeOperand function --- %s",
error_no, lua_tostring(lua_state, -1));
// pops out the lua error from stack
lua_pop(lua_state, 1);
return false;
}
bool is_filtered = false;
// check the return value
if (!lua_isboolean(lua_state, -1)) {
LogLuaError("[Lua] Error in FilterMergeOperand function -- "
"return value is not a boolean while boolean is expected");
} else {
is_filtered = lua_toboolean(lua_state, -1);
}
lua_pop(lua_state, 1);
return is_filtered;
}
*/
bool RocksLuaCompactionFilter::IgnoreSnapshots() const {
return options_.ignore_snapshots;
}
RocksLuaCompactionFilterFactory::RocksLuaCompactionFilterFactory(
const RocksLuaCompactionFilterOptions opt)
: opt_(opt) {
auto filter = CreateCompactionFilter(CompactionFilter::Context());
name_ = std::string("RocksLuaCompactionFilterFactory::") +
std::string(filter->Name());
}
std::unique_ptr<CompactionFilter>
RocksLuaCompactionFilterFactory::CreateCompactionFilter(
const CompactionFilter::Context& context) {
std::lock_guard<std::mutex> lock(opt_mutex_);
return std::unique_ptr<CompactionFilter>(new RocksLuaCompactionFilter(opt_));
}
std::string RocksLuaCompactionFilterFactory::GetScript() {
std::lock_guard<std::mutex> lock(opt_mutex_);
return opt_.lua_script;
}
void RocksLuaCompactionFilterFactory::SetScript(const std::string& new_script) {
std::lock_guard<std::mutex> lock(opt_mutex_);
opt_.lua_script = new_script;
}
const char* RocksLuaCompactionFilterFactory::Name() const {
return name_.c_str();
}
} // namespace lua
} // namespace rocksdb
#endif // defined(LUA) && !defined(ROCKSDB_LITE)