blob: d6cf171a018330cc6fc6545921a3a412351f92e2 [file] [log] [blame]
#include <glog/logging.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/iostats_context.h>
#include <chrono>
#include <utility>
#include "util.h"
#include "redis_cmd.h"
#include "redis_reply.h"
#include "redis_request.h"
#include "redis_connection.h"
#include "server.h"
namespace Redis {
const size_t PROTO_INLINE_MAX_SIZE = 16 * 1024L;
const size_t PROTO_BULK_MAX_SIZE = 128 * 1024L * 1024L;
const size_t PROTO_MAX_MULTI_BULKS = 8 * 1024L;
Status Request::Tokenize(evbuffer *input) {
char *line;
size_t len;
while (true) {
switch (state_) {
case ArrayLen:
line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF_STRICT);
if (!line || len <= 0) return Status::OK();
svr_->stats_.IncrInbondBytes(len);
if (line[0] == '*') {
try {
multi_bulk_len_ = std::stoull(std::string(line + 1, len-1));
} catch (std::exception &e) {
free(line);
return Status(Status::NotOK, "Protocol error: expect integer");
}
if (multi_bulk_len_ > PROTO_MAX_MULTI_BULKS) {
free(line);
return Status(Status::NotOK, "Protocol error: too many bulk strings");
}
state_ = BulkLen;
} else {
if (len > PROTO_INLINE_MAX_SIZE) {
free(line);
return Status(Status::NotOK, "Protocol error: too big inline request");
}
Util::Split(std::string(line, len), " \t", &tokens_);
commands_.push_back(std::move(tokens_));
state_ = ArrayLen;
}
free(line);
break;
case BulkLen:
line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF_STRICT);
if (!line || len <= 0) return Status::OK();
svr_->stats_.IncrInbondBytes(len);
if (line[0] != '$') {
free(line);
return Status(Status::NotOK, "Protocol error: expect '$'");
}
try {
bulk_len_ = std::stoull(std::string(line + 1, len-1));
} catch (std::exception &e) {
free(line);
return Status(Status::NotOK, "Protocol error: expect integer");
}
if (bulk_len_ > PROTO_BULK_MAX_SIZE) {
free(line);
return Status(Status::NotOK, "Protocol error: too big bulk string");
}
free(line);
state_ = BulkData;
break;
case BulkData:
if (evbuffer_get_length(input) < bulk_len_ + 2) return Status::OK();
char *data = reinterpret_cast<char *>(evbuffer_pullup(input, bulk_len_ + 2));
tokens_.emplace_back(data, bulk_len_);
evbuffer_drain(input, bulk_len_ + 2);
svr_->stats_.IncrInbondBytes(bulk_len_ + 2);
--multi_bulk_len_;
if (multi_bulk_len_ == 0) {
state_ = ArrayLen;
commands_.push_back(std::move(tokens_));
tokens_.clear();
} else {
state_ = BulkLen;
}
break;
}
}
}
bool Request::inCommandWhitelist(const std::string &command) {
std::vector<std::string> whitelist = {"auth"};
for (const auto &allow_command : whitelist) {
if (allow_command == command) return true;
}
return false;
}
bool Request::turnOnProfilingIfNeed(const std::string &cmd) {
auto config = svr_->GetConfig();
if (config->profiling_sample_ratio == 0) return false;
if (!config->profiling_sample_all_commands &&
config->profiling_sample_commands.find(cmd) == config->profiling_sample_commands.end()) {
return false;
}
if (config->profiling_sample_ratio == 100 ||
std::rand() % 100 <= config->profiling_sample_ratio) {
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
rocksdb::get_perf_context()->Reset();
rocksdb::get_iostats_context()->Reset();
return true;
}
return false;
}
void Request::recordProfilingSampleIfNeed(const std::string &cmd, uint64_t duration) {
int threshold = svr_->GetConfig()->profiling_sample_record_threshold_ms;
if (threshold > 0 && static_cast<int>(duration/1000) < threshold) {
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kDisable);
return;
}
std::string perf_context = rocksdb::get_perf_context()->ToString(true);
std::string iostats_context = rocksdb::get_iostats_context()->ToString(true);
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kDisable);
if (perf_context.empty()) return; // request without db operation
svr_->GetPerfLog()->PushEntry({cmd, std::move(perf_context),
std::move(iostats_context), duration, 0});
}
void Request::ExecuteCommands(Connection *conn) {
if (commands_.empty()) return;
Config *config = svr_->GetConfig();
std::string reply;
for (auto &cmd_tokens : commands_) {
if (conn->IsFlagEnabled(Redis::Connection::kCloseAfterReply)) break;
if (conn->GetNamespace().empty()) {
if (!config->requirepass.empty() && Util::ToLower(cmd_tokens.front()) != "auth") {
conn->Reply(Redis::Error("NOAUTH Authentication required."));
continue;
}
conn->BecomeAdmin();
conn->SetNamespace(kDefaultNamespace);
}
auto s = LookupCommand(cmd_tokens.front(), &conn->current_cmd_, conn->IsRepl());
if (!s.IsOK()) {
conn->Reply(Redis::Error("ERR unknown command"));
continue;
}
if (svr_->IsLoading() && !inCommandWhitelist(conn->current_cmd_->Name())) {
conn->Reply(Redis::Error("ERR restoring the db from backup"));
break;
}
int arity = conn->current_cmd_->GetArity();
int tokens = static_cast<int>(cmd_tokens.size());
if ((arity > 0 && tokens != arity)
|| (arity < 0 && tokens < -arity)) {
conn->Reply(Redis::Error("ERR wrong number of arguments"));
continue;
}
conn->current_cmd_->SetArgs(cmd_tokens);
s = conn->current_cmd_->Parse(cmd_tokens);
if (!s.IsOK()) {
conn->Reply(Redis::Error(s.Msg()));
continue;
}
if (config->slave_readonly && svr_->IsSlave() && conn->current_cmd_->IsWrite()) {
conn->Reply(Redis::Error("READONLY You can't write against a read only slave."));
continue;
}
conn->SetLastCmd(conn->current_cmd_->Name());
svr_->stats_.IncrCalls(conn->current_cmd_->Name());
auto start = std::chrono::high_resolution_clock::now();
bool is_profiling = turnOnProfilingIfNeed(conn->current_cmd_->Name());
svr_->IncrExecutingCommandNum();
s = conn->current_cmd_->Execute(svr_, conn, &reply);
svr_->DecrExecutingCommandNum();
auto end = std::chrono::high_resolution_clock::now();
uint64_t duration = std::chrono::duration_cast<std::chrono::microseconds>(end-start).count();
if (is_profiling) recordProfilingSampleIfNeed(conn->current_cmd_->Name(), duration);
svr_->SlowlogPushEntryIfNeeded(conn->current_cmd_->Args(), duration);
svr_->stats_.IncrLatency(static_cast<uint64_t>(duration), conn->current_cmd_->Name());
svr_->FeedMonitorConns(conn, cmd_tokens);
if (!s.IsOK()) {
conn->Reply(Redis::Error("ERR " + s.Msg()));
LOG(ERROR) << "[request] Failed to execute command: " << conn->current_cmd_->Name()
<< ", encounter err: " << s.Msg();
continue;
}
if (!reply.empty()) conn->Reply(reply);
}
commands_.clear();
}
} // namespace Redis