| #include <arpa/inet.h> |
| #include <fcntl.h> |
| #include <glog/logging.h> |
| #include <sys/socket.h> |
| #include <algorithm> |
| #include <cctype> |
| #include <chrono> |
| #include <thread> |
| #include <utility> |
| #include <memory> |
| |
| #include "redis_db.h" |
| #include "redis_cmd.h" |
| #include "redis_hash.h" |
| #include "redis_bitmap.h" |
| #include "redis_list.h" |
| #include "redis_request.h" |
| #include "redis_connection.h" |
| #include "redis_set.h" |
| #include "redis_string.h" |
| #include "redis_zset.h" |
| #include "redis_pubsub.h" |
| #include "replication.h" |
| #include "util.h" |
| #include "storage.h" |
| #include "worker.h" |
| #include "server.h" |
| |
| namespace Redis { |
| |
| const char *kValueNotInterger = "value is not an integer or out of range"; |
| |
| class CommandAuth : public Commander { |
| public: |
| CommandAuth() : Commander("auth", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Config *config = svr->GetConfig(); |
| auto iter = config->tokens.find(args_[1]); |
| if (iter != config->tokens.end()) { |
| conn->SetNamespace(iter->second); |
| conn->BecomeUser(); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| const std::string requirepass = config->requirepass; |
| if (!requirepass.empty() && args_[1] != requirepass) { |
| *output = Redis::Error("ERR invaild password"); |
| return Status::OK(); |
| } |
| conn->SetNamespace(kDefaultNamespace); |
| conn->BecomeAdmin(); |
| if (requirepass.empty()) { |
| *output = Redis::Error("ERR Client sent AUTH, but no password is set"); |
| } else { |
| *output = Redis::SimpleString("OK"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandNamespace : public Commander { |
| public: |
| CommandNamespace() : Commander("namespace", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error("only administrator can use namespace command"); |
| return Status::OK(); |
| } |
| Config *config = svr->GetConfig(); |
| if (args_.size() == 3 && args_[1] == "get") { |
| if (args_[2] == "*") { |
| std::vector<std::string> namespaces; |
| auto tokens = config->tokens; |
| for (auto iter = tokens.begin(); iter != tokens.end(); iter++) { |
| namespaces.emplace_back(iter->second); // namespace |
| namespaces.emplace_back(iter->first); // token |
| } |
| *output = Redis::MultiBulkString(namespaces); |
| } else { |
| std::string token; |
| config->GetNamespace(args_[2], &token); |
| *output = Redis::BulkString(token); |
| } |
| } else if (args_.size() == 4 && args_[1] == "set") { |
| Status s = config->SetNamespace(args_[2], args_[3]); |
| *output = s.IsOK() ? Redis::SimpleString("OK") : Redis::Error(s.Msg()); |
| LOG(WARNING) << "Updated namespace: " << args_[2] << " with token: " << args_[3] |
| << ", addr: " << conn->GetAddr() << ", result: " << s.Msg(); |
| } else if (args_.size() == 4 && args_[1] == "add") { |
| Status s = config->AddNamespace(args_[2], args_[3]); |
| *output = s.IsOK() ? Redis::SimpleString("OK") : Redis::Error(s.Msg()); |
| LOG(WARNING) << "New namespace: " << args_[2] << " with token: " << args_[3] |
| << ", addr: " << conn->GetAddr() << ", result: " << s.Msg(); |
| } else if (args_.size() == 3 && args_[1] == "del") { |
| Status s = config->DelNamespace(args_[2]); |
| *output = s.IsOK() ? Redis::SimpleString("OK") : Redis::Error(s.Msg()); |
| LOG(WARNING) << "Deleted namespace: " << args_[2] |
| << ", addr: " << conn->GetAddr() << ", result: " << s.Msg(); |
| } else { |
| *output = Redis::Error( |
| "NAMESPACE subcommand must be one of GET, SET, DEL, ADD"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandKeys : public Commander { |
| public: |
| CommandKeys() : Commander("keys", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string prefix = args_[1]; |
| std::vector<std::string> keys; |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| if (prefix == "*") { |
| redis.Keys(std::string(), &keys); |
| } else { |
| if (prefix[prefix.size() - 1] != '*') { |
| *output = Redis::Error("ERR only keys prefix match was supported"); |
| return Status::OK(); |
| } |
| redis.Keys(prefix.substr(0, prefix.size() - 1), &keys); |
| } |
| *output = Redis::MultiBulkString(keys); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandFlushDB : public Commander { |
| public: |
| CommandFlushDB() : Commander("flushdb", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.FlushDB(); |
| LOG(WARNING) << "DB keys in namespce: " << conn->GetNamespace() |
| << " was flused, addr: " << conn->GetAddr(); |
| if (s.ok()) { |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| }; |
| |
| class CommandPing : public Commander { |
| public: |
| CommandPing() : Commander("ping", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| *output = Redis::SimpleString("PONG"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSelect: public Commander { |
| public: |
| CommandSelect() : Commander("select", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandConfig : public Commander { |
| public: |
| CommandConfig() : Commander("config", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error("only administrator can use config command"); |
| return Status::OK(); |
| } |
| |
| Config *config = svr->GetConfig(); |
| if (args_.size() == 2 && Util::ToLower(args_[1]) == "rewrite") { |
| Status s = config->Rewrite(); |
| if (!s.IsOK()) return Status(Status::RedisExecErr, s.Msg()); |
| *output = Redis::SimpleString("OK"); |
| LOG(INFO) << "# CONFIG REWRITE executed with success"; |
| } else if (args_.size() == 3 && Util::ToLower(args_[1]) == "get") { |
| std::vector<std::string> values; |
| config->Get(args_[2], &values); |
| *output = Redis::MultiBulkString(values); |
| } else if (args_.size() == 4 && Util::ToLower(args_[1]) == "set") { |
| Status s = config->Set(args_[2], args_[3], svr); |
| if (!s.IsOK()) { |
| return Status(Status::NotOK, s.Msg() + ", key: " + args_[2]); |
| } |
| *output = Redis::SimpleString("OK"); |
| } else { |
| *output = Redis::Error("CONFIG subcommand must be one of GET, SET, REWRITE"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGet : public Commander { |
| public: |
| CommandGet() : Commander("get", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string value; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.Get(args_[1], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::NilString() : Redis::BulkString(value); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandStrlen: public Commander { |
| public: |
| CommandStrlen() : Commander("strlen", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string value; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.Get(args_[1], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::Integer(0); |
| } else { |
| *output = Redis::Integer(value.size()); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGetSet : public Commander { |
| public: |
| CommandGetSet() : Commander("getset", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| std::string old_value; |
| rocksdb::Status s = string_db.GetSet(args_[1], args_[2], &old_value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::BulkString(old_value); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGetRange: public Commander { |
| public: |
| CommandGetRange() : Commander("getrange", 4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string value; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.Get(args_[1], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::NilString(); |
| return Status::OK(); |
| } |
| if (start_ < 0) start_ = static_cast<int>(value.size()) + start_; |
| if (stop_ < 0) stop_ = static_cast<int>(value.size()) + stop_; |
| if (start_ < 0) start_ = 0; |
| if (stop_ > static_cast<int>(value.size())) stop_ = static_cast<int>(value.size()); |
| if (start_ > stop_) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::BulkString(value.substr(start_, stop_+1)); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0, stop_ = 0; |
| }; |
| |
| class CommandSetRange: public Commander { |
| public: |
| CommandSetRange() : Commander("setrange", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| offset_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.SetRange(args_[1], offset_, args_[3], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int offset_ = 0; |
| }; |
| |
| class CommandMGet : public Commander { |
| public: |
| CommandMGet() : Commander("mget", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> keys; |
| for (size_t i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| std::vector<std::string> values; |
| // always return OK |
| string_db.MGet(keys, &values); |
| *output = Redis::MultiBulkString(values); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandAppend: public Commander { |
| public: |
| CommandAppend() : Commander("append", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.Append(args_[1], args_[2], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSet : public Commander { |
| public: |
| CommandSet() : Commander("set", -3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| for (size_t i = 3; i < args.size(); i++) { |
| std::string opt = Util::ToLower(args[i]); |
| if (opt == "nx" || opt == "xx") { |
| opt == "nx" ? nx_ = true : xx_ = true; |
| continue; |
| } |
| if ((opt == "ex" || opt == "px") && i+1 < args.size()) { |
| if (opt == "ex") { |
| ttl_ = atoi(args[i+1].c_str()); |
| } else { |
| ttl_ = atol(args[i+1].c_str())/1000; |
| } |
| i++; |
| continue; |
| } |
| return Status(Status::NotOK, "syntax error"); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s; |
| if (nx_) { |
| s = string_db.SetNX(args_[1], args_[2], ttl_, &ret); |
| } else if (xx_) { |
| s = string_db.SetXX(args_[1], args_[2], ttl_, &ret); |
| } else { |
| s = string_db.SetEX(args_[1], args_[2], ttl_); |
| } |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if ((nx_ || xx_) && !ret) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::SimpleString("OK"); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| bool xx_ = false; |
| bool nx_ = false; |
| int ttl_ = 0; |
| }; |
| |
| class CommandSetEX : public Commander { |
| public: |
| CommandSetEX() : Commander("setex", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| ttl_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.SetEX(args_[1], args_[3], ttl_); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| |
| private: |
| int ttl_ = 0; |
| }; |
| |
| class CommandMSet : public Commander { |
| public: |
| CommandMSet() : Commander("mset", -3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 1) { |
| return Status(Status::RedisParseErr, "wrong number of arguments"); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| std::vector<StringPair> kvs; |
| for (size_t i = 1; i < args_.size(); i+=2) { |
| kvs.emplace_back(StringPair{args_[i], args_[i+1]}); |
| } |
| rocksdb::Status s = string_db.MSet(kvs); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSetNX : public Commander { |
| public: |
| CommandSetNX() : Commander("setnx", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.SetNX(args_[1], args_[2], 0, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandIncr : public Commander { |
| public: |
| CommandIncr() : Commander("incr", 2, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrBy(args_[1], 1, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandDecr : public Commander { |
| public: |
| CommandDecr() : Commander("decr", 2, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrBy(args_[1], -1, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandIncrBy : public Commander { |
| public: |
| CommandIncrBy() : Commander("incrby", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stoll(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrBy(args_[1], increment_, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int64_t increment_ = 0; |
| }; |
| |
| class CommandIncrByFloat : public Commander { |
| public: |
| CommandIncrByFloat() : Commander("incrbyfloat", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stof(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| float ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrByFloat(args_[1], increment_, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::BulkString(std::to_string(ret)); |
| return Status::OK(); |
| } |
| |
| private: |
| float increment_ = 0; |
| }; |
| |
| class CommandDecrBy : public Commander { |
| public: |
| CommandDecrBy() : Commander("decrby", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stoll(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrBy(args_[1], -1 * increment_, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int64_t increment_ = 0; |
| }; |
| |
| class CommandDel : public Commander { |
| public: |
| CommandDel() : Commander("del", -2, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int cnt = 0; |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| for (unsigned int i = 1; i < args_.size(); i++) { |
| rocksdb::Status s = redis.Del(args_[i]); |
| if (s.ok()) cnt++; |
| } |
| *output = Redis::Integer(cnt); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGetBit : public Commander { |
| public: |
| CommandGetBit() : Commander("getbit", 3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| offset_ = std::stoul(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| bool bit; |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = bitmap_db.GetBit(args_[1], offset_, &bit); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(bit? 1 : 0); |
| return Status::OK(); |
| } |
| private: |
| uint32_t offset_ = 0; |
| }; |
| |
| class CommandSetBit : public Commander { |
| public: |
| CommandSetBit() : Commander("setbit", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| offset_ = std::stoul(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| if (args[3] == "0") { |
| bit_ = false; |
| } else if (args[3] == "1") { |
| bit_ = true; |
| } else { |
| return Status(Status::RedisParseErr, "bit should be 0 or 1"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| bool old_bit; |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = bitmap_db.SetBit(args_[1], offset_, bit_, &old_bit); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(old_bit? 1 : 0); |
| return Status::OK(); |
| } |
| |
| private: |
| uint32_t offset_ = 0; |
| bool bit_ = false; |
| }; |
| |
| class CommandBitCount : public Commander { |
| public: |
| CommandBitCount() : Commander("bitcount", -2, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| if (args.size() >= 3) start_ = std::stoi(args[2]); |
| if (args.size() >= 4) stop_ = std::stoi(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint32_t cnt; |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = bitmap_db.BitCount(args_[1], start_, stop_, &cnt); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(cnt); |
| return Status::OK(); |
| } |
| private: |
| int start_ = 0, stop_ = -1; |
| }; |
| |
| class CommandBitPos: public Commander { |
| public: |
| CommandBitPos() : Commander("bitcount", -3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| if (args.size() >= 4) start_ = std::stoi(args[3]); |
| if (args.size() >= 5) stop_ = std::stoi(args[4]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| if (args[2] == "0") { |
| bit_ = false; |
| } else if (args[2] == "1") { |
| bit_ = true; |
| } else { |
| return Status(Status::RedisParseErr, "bit should be 0 or 1"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int pos; |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, &pos); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(pos); |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0, stop_ = -1; |
| bool bit_ = false; |
| }; |
| |
| class CommandType : public Commander { |
| public: |
| CommandType() : Commander("type", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| RedisType type; |
| rocksdb::Status s = redis.Type(args_[1], &type); |
| if (s.ok()) { |
| *output = Redis::BulkString(RedisTypeNames[type]); |
| return Status::OK(); |
| } |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| }; |
| |
| class CommandObject : public Commander { |
| public: |
| CommandObject() : Commander("object", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (Util::ToLower(args_[1]) == "dump") { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> infos; |
| rocksdb::Status s = redis.Dump(args_[2], &infos); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| output->append(Redis::MultiLen(infos.size())); |
| for (const auto info : infos) { |
| output->append(Redis::BulkString(info)); |
| } |
| } else { |
| *output = Redis::Error("object subcommand must be dump"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandTTL : public Commander { |
| public: |
| CommandTTL() : Commander("ttl", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| int ttl; |
| rocksdb::Status s = redis.TTL(args_[1], &ttl); |
| if (s.ok()) { |
| *output = Redis::Integer(ttl); |
| return Status::OK(); |
| } else { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| } |
| }; |
| |
| class CommandPTTL : public Commander { |
| public: |
| CommandPTTL() : Commander("pttl", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| int ttl; |
| rocksdb::Status s = redis.TTL(args_[1], &ttl); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| if (ttl > 0) { |
| *output = Redis::Integer(ttl*1000); |
| } else { |
| *output = Redis::Integer(ttl); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandExists : public Commander { |
| public: |
| CommandExists() : Commander("exists", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int cnt = 0; |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| std::vector<rocksdb::Slice> keys; |
| for (unsigned i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| redis.Exists(keys, &cnt); |
| *output = Redis::Integer(cnt); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandExpire : public Commander { |
| public: |
| CommandExpire() : Commander("expire", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| int64_t now; |
| rocksdb::Env::Default()->GetCurrentTime(&now); |
| try { |
| seconds_ = std::stoi(args[2]); |
| if (seconds_ >= INT32_MAX - now) { |
| return Status(Status::RedisParseErr, "the expire time was overflow"); |
| } |
| seconds_ += now; |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.Expire(args_[1], seconds_); |
| if (s.ok()) { |
| *output = Redis::Integer(1); |
| } else { |
| *output = Redis::Integer(0); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int seconds_ = 0; |
| }; |
| |
| class CommandPExpire : public Commander { |
| public: |
| CommandPExpire() : Commander("pexpire", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| int64_t now; |
| rocksdb::Env::Default()->GetCurrentTime(&now); |
| try { |
| seconds_ = std::stol(args[2])/1000; |
| if (seconds_ >= INT32_MAX - now) { |
| return Status(Status::RedisParseErr, "the expire time was overflow"); |
| } |
| seconds_ += now; |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.Expire(args_[1], seconds_); |
| if (s.ok()) { |
| *output = Redis::Integer(1); |
| } else { |
| *output = Redis::Integer(0); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int seconds_ = 0; |
| }; |
| |
| class CommandExpireAt : public Commander { |
| public: |
| CommandExpireAt() : Commander("expireat", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| timestamp_ = std::stoi(args[2]); |
| if (timestamp_ >= INT32_MAX) { |
| return Status(Status::RedisParseErr, "the expire time was overflow"); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.Expire(args_[1], timestamp_); |
| if (s.ok()) { |
| *output = Redis::Integer(1); |
| } else { |
| *output = Redis::Integer(0); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int timestamp_ = 0; |
| }; |
| |
| class CommandPExpireAt : public Commander { |
| public: |
| CommandPExpireAt() : Commander("pexpireat", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| timestamp_ = static_cast<int>(std::stol(args[2])/1000); |
| if (timestamp_ >= INT32_MAX) { |
| return Status(Status::RedisParseErr, "the expire time was overflow"); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.Expire(args_[1], timestamp_); |
| if (s.ok()) { |
| *output = Redis::Integer(1); |
| } else { |
| *output = Redis::Integer(0); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int timestamp_ = 0; |
| }; |
| |
| class CommandPersist : public Commander { |
| public: |
| CommandPersist() : Commander("persist", 2, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ttl; |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.TTL(args_[1], &ttl); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| if (ttl == -1 || ttl == -2) { |
| *output = Redis::Integer(0); |
| return Status::OK(); |
| } |
| s = redis.Expire(args_[1], 0); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(1); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHGet : public Commander { |
| public: |
| CommandHGet() : Commander("hget", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::string value; |
| rocksdb::Status s = hash_db.Get(args_[1], args_[2], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::NilString() : Redis::BulkString(value); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHSet : public Commander { |
| public: |
| CommandHSet() : Commander("hset", 4, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.Set(args_[1], args_[2], args_[3], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHSetNX : public Commander { |
| public: |
| CommandHSetNX() : Commander("hsetnx", 4, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.SetNX(args_[1], args_[2], args_[3], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHStrlen : public Commander { |
| public: |
| CommandHStrlen() : Commander("hstrlen", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::string value; |
| rocksdb::Status s = hash_db.Get(args_[1], args_[2], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(static_cast<int>(value.size())); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHDel : public Commander { |
| public: |
| CommandHDel() : Commander("hdel", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> fields; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| fields.emplace_back(Slice(args_[i])); |
| } |
| rocksdb::Status s = hash_db.Delete(args_[1], fields, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHExists : public Commander { |
| public: |
| CommandHExists() : Commander("hexists", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::string value; |
| rocksdb::Status s = hash_db.Get(args_[1], args_[2], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::Integer(0) : Redis::Integer(1); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHLen : public Commander { |
| public: |
| CommandHLen() : Commander("hlen", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint32_t count; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.Size(args_[1], &count); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::Integer(0) : Redis::Integer(count); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHIncrBy : public Commander { |
| public: |
| CommandHIncrBy() : Commander("hincrby", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stoll(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.IncrBy(args_[1], args_[2], increment_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int64_t increment_ = 0; |
| }; |
| |
| class CommandHIncrByFloat : public Commander { |
| public: |
| CommandHIncrByFloat() : Commander("hincrbyfloat", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stof(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| float ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.IncrByFloat(args_[1], args_[2], increment_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::BulkString(std::to_string(ret)); |
| return Status::OK(); |
| } |
| |
| private: |
| float increment_ = 0; |
| }; |
| |
| class CommandHMGet : public Commander { |
| public: |
| CommandHMGet() : Commander("hmget", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> fields; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| fields.emplace_back(Slice(args_[i])); |
| } |
| std::vector<std::string> values; |
| rocksdb::Status s = hash_db.MGet(args_[1], fields, &values); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| values.resize(fields.size(), ""); |
| } |
| *output = Redis::MultiBulkString(values); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHMSet : public Commander { |
| public: |
| CommandHMSet() : Commander("hmset", -4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 0) { |
| return Status(Status::RedisParseErr, "wrong number of arguments"); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<FieldValue> field_values; |
| for (unsigned int i = 2; i < args_.size(); i += 2) { |
| field_values.push_back(FieldValue{args_[i], args_[i + 1]}); |
| } |
| rocksdb::Status s = hash_db.MSet(args_[1], field_values, false, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHKeys : public Commander { |
| public: |
| CommandHKeys() : Commander("hkeys", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<FieldValue> field_values; |
| rocksdb::Status s = hash_db.GetAll(args_[1], &field_values, HashFetchType::kOnlyKey); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| std::vector<std::string> keys; |
| for (const auto fv : field_values) { |
| keys.emplace_back(fv.field); |
| } |
| *output = Redis::MultiBulkString(keys); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHVals : public Commander { |
| public: |
| CommandHVals() : Commander("hvals", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<FieldValue> field_values; |
| rocksdb::Status s = hash_db.GetAll(args_[1], &field_values, HashFetchType::kOnlyValue); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| std::vector<std::string> values; |
| for (const auto fv : field_values) { |
| values.emplace_back(fv.value); |
| } |
| *output = Redis::MultiBulkString(values); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHGetAll : public Commander { |
| public: |
| CommandHGetAll() : Commander("hgetall", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<FieldValue> field_values; |
| rocksdb::Status s = hash_db.GetAll(args_[1], &field_values); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = "*" + std::to_string(field_values.size() * 2) + CRLF; |
| for (const auto fv : field_values) { |
| *output += Redis::BulkString(fv.field); |
| *output += Redis::BulkString(fv.value); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPush : public Commander { |
| public: |
| CommandPush(bool create_if_missing, bool left) |
| : Commander("push", -3, true) { |
| left_ = left; |
| create_if_missing_ = create_if_missing; |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> elems; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| elems.emplace_back(args_[i]); |
| } |
| int ret; |
| rocksdb::Status s; |
| if (create_if_missing_) { |
| s = list_db.Push(args_[1], elems, left_, &ret); |
| } else { |
| s = list_db.PushX(args_[1], elems, left_, &ret); |
| } |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| svr->WakeupBlockingConns(args_[1], elems.size()); |
| |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| bool left_; |
| bool create_if_missing_; |
| }; |
| |
| class CommandLPush : public CommandPush { |
| public: |
| CommandLPush() : CommandPush(true, true) { name_ = "lpush"; } |
| }; |
| |
| class CommandRPush : public CommandPush { |
| public: |
| CommandRPush() : CommandPush(true, false) { name_ = "rpush"; } |
| }; |
| |
| class CommandLPushX : public CommandPush { |
| public: |
| CommandLPushX() : CommandPush(false, true) { name_ = "lpushx"; } |
| }; |
| |
| class CommandRPushX : public CommandPush { |
| public: |
| CommandRPushX() : CommandPush(false, false) { name_ = "rpushx"; } |
| }; |
| |
| class CommandPop : public Commander { |
| public: |
| explicit CommandPop(bool left) : Commander("pop", 2, true) { left_ = left; } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::string elem; |
| rocksdb::Status s = list_db.Pop(args_[1], &elem, left_); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::BulkString(elem); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| bool left_; |
| }; |
| |
| class CommandLPop : public CommandPop { |
| public: |
| CommandLPop() : CommandPop(true) { name_ = "lpop"; } |
| }; |
| |
| class CommandRPop : public CommandPop { |
| public: |
| CommandRPop() : CommandPop(false) { name_ = "rpop"; } |
| }; |
| |
| class CommandBPop : public Commander { |
| public: |
| explicit CommandBPop(bool left) : Commander("bpop", -3, true) { left_ = left; } |
| ~CommandBPop() { |
| if (timer_ != nullptr) { |
| event_free(timer_); |
| timer_ = nullptr; |
| } |
| } |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| timeout_ = std::stoi(args[args.size() - 1]); |
| if (timeout_ < 0) { |
| return Status(Status::RedisParseErr, "timeout should not be negative"); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, "timeout is not an integer or out of range"); |
| } |
| keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1); |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| svr_ = svr; |
| conn_ = conn; |
| |
| auto bev = conn->GetBufferEvent(); |
| auto s = TryPopFromList(); |
| if (s.ok() || !s.IsNotFound()) { |
| return Status::OK(); // error has already output in TryPopFromList |
| } |
| for (const auto &key : keys_) { |
| svr_->AddBlockingKey(key, conn_); |
| } |
| bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this); |
| if (timeout_) { |
| timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this); |
| timeval tm = {timeout_, 0}; |
| evtimer_add(timer_, &tm); |
| } |
| return Status::OK(); |
| } |
| |
| rocksdb::Status TryPopFromList() { |
| Redis::List list_db(svr_->storage_, conn_->GetNamespace()); |
| std::string elem; |
| rocksdb::Status s; |
| for (const auto &key : keys_) { |
| s = list_db.Pop(key, &elem, left_); |
| if (s.ok() || !s.IsNotFound()) { |
| break; |
| } |
| } |
| if (s.ok()) { |
| conn_->Reply(Redis::BulkString(elem)); |
| } else if (!s.IsNotFound()) { |
| conn_->Reply(Redis::Error("ERR " + s.ToString())); |
| LOG(ERROR) << "Failed to execute redis command: " << conn_->current_cmd_->Name() |
| << ", err: " << s.ToString(); |
| } |
| return s; |
| } |
| |
| static void WriteCB(bufferevent *bev, void *ctx) { |
| auto self = reinterpret_cast<CommandBPop *>(ctx); |
| auto s = self->TryPopFromList(); |
| // if pop fail ,currently we compromised to close bpop request |
| if (s.IsNotFound()) { |
| self->conn_->Reply(Redis::NilString()); |
| LOG(ERROR) << "[BPOP] Failed to execute redis command: " << self->conn_->current_cmd_->Name() |
| << ", err: another concurrent pop request must have stole the data before this bpop request" |
| << " or bpop is in a pipeline cmd list(cmd before bpop replyed trigger this writecb)"; |
| } |
| if (self->timer_ != nullptr) { |
| event_free(self->timer_); |
| self->timer_ = nullptr; |
| } |
| self->unBlockingAll(); |
| bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, |
| Redis::Connection::OnEvent, self->conn_); |
| bufferevent_enable(bev, EV_READ); |
| } |
| |
| static void EventCB(bufferevent *bev, int16_t events, void *ctx) { |
| auto self = static_cast<CommandBPop *>(ctx); |
| if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { |
| if (self->timer_ != nullptr) { |
| event_free(self->timer_); |
| self->timer_ = nullptr; |
| } |
| self->unBlockingAll(); |
| } |
| Redis::Connection::OnEvent(bev, events, self->conn_); |
| } |
| |
| static void TimerCB(int, int16_t events, void *ctx) { |
| auto self = reinterpret_cast<CommandBPop *>(ctx); |
| self->conn_->Reply(Redis::NilString()); |
| event_free(self->timer_); |
| self->timer_ = nullptr; |
| self->unBlockingAll(); |
| auto bev = self->conn_->GetBufferEvent(); |
| bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, |
| Redis::Connection::OnEvent, self->conn_); |
| bufferevent_enable(bev, EV_READ); |
| } |
| |
| private: |
| bool left_ = false; |
| int timeout_ = 0; // second |
| std::vector<std::string> keys_; |
| Server *svr_ = nullptr; |
| Connection *conn_ = nullptr; |
| event *timer_ = nullptr; |
| |
| void unBlockingAll() { |
| for (const auto &key : keys_) { |
| svr_->UnBlockingKey(key, conn_); |
| } |
| } |
| }; |
| |
| class CommandBLPop : public CommandBPop { |
| public: |
| CommandBLPop() : CommandBPop(true) { name_ = "blpop"; } |
| }; |
| |
| class CommandBRPop : public CommandBPop { |
| public: |
| CommandBRPop() : CommandBPop(false) { name_ = "brpop"; } |
| }; |
| |
| class CommandLRem : public Commander { |
| public: |
| CommandLRem() : Commander("lrem", 4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| count_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = list_db.Rem(args_[1], count_, args_[3], &ret); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int count_ = 0; |
| }; |
| |
| class CommandLInsert : public Commander { |
| public: |
| CommandLInsert() : Commander("linsert", 5, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if ((Util::ToLower(args[2]) == "before")) { |
| before_ = true; |
| } else if ((Util::ToLower(args[2]) == "after")) { |
| before_ = false; |
| } else { |
| return Status(Status::RedisParseErr, "syntax error"); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = list_db.Insert(args_[1], args_[3], args_[4], before_, &ret); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| bool before_ = false; |
| }; |
| |
| class CommandLRange : public Commander { |
| public: |
| CommandLRange() : Commander("lrange", 4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> elems; |
| rocksdb::Status s = list_db.Range(args_[1], start_, stop_, &elems); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(elems); |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0, stop_ = 0; |
| }; |
| |
| class CommandLLen : public Commander { |
| public: |
| CommandLLen() : Commander("llen", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| uint32_t count; |
| rocksdb::Status s = list_db.Size(args_[1], &count); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(count); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandLIndex : public Commander { |
| public: |
| CommandLIndex() : Commander("lindex", 3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| index_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::string elem; |
| rocksdb::Status s = list_db.Index(args_[1], index_, &elem); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::BulkString(elem); |
| return Status::OK(); |
| } |
| |
| private: |
| int index_ = 0; |
| }; |
| |
| class CommandLSet : public Commander { |
| public: |
| CommandLSet() : Commander("lset", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| index_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = list_db.Set(args_[1], index_, args_[3]); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| |
| private: |
| int index_ = 0; |
| }; |
| |
| class CommandLTrim : public Commander { |
| public: |
| CommandLTrim() : Commander("ltrim", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = list_db.Trim(args_[1], start_, stop_); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0, stop_ = 0; |
| }; |
| |
| class CommandRPopLPUSH : public Commander { |
| public: |
| CommandRPopLPUSH() : Commander("rpoplpush", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::string elem; |
| rocksdb::Status s = list_db.RPopLPush(args_[1], args_[2], &elem); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::NilString() : Redis::BulkString(elem); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSAdd : public Commander { |
| public: |
| CommandSAdd() : Commander("sadd", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> members; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| members.emplace_back(args_[i]); |
| } |
| int ret; |
| rocksdb::Status s = set_db.Add(args_[1], members, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSRem : public Commander { |
| public: |
| CommandSRem() : Commander("srem", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> members; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| members.emplace_back(args_[i]); |
| } |
| int ret; |
| rocksdb::Status s = set_db.Remove(args_[1], members, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSCard : public Commander { |
| public: |
| CommandSCard() : Commander("scard", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = set_db.Card(args_[1], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSMembers : public Commander { |
| public: |
| CommandSMembers() : Commander("smembers", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| rocksdb::Status s = set_db.Members(args_[1], &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSIsMember : public Commander { |
| public: |
| CommandSIsMember() : Commander("sismmeber", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = set_db.IsMember(args_[1], args_[2], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSPop : public Commander { |
| public: |
| CommandSPop() : Commander("spop", -2, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| if (args.size() == 3) { |
| count_ = std::stoi(args[2]); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| rocksdb::Status s = set_db.Take(args_[1], &members, count_, true); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| |
| private: |
| int count_ = 1; |
| }; |
| |
| class CommandSRandMember : public Commander { |
| public: |
| CommandSRandMember() : Commander("srandmember", -2, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| if (args.size() == 3) { |
| count_ = std::stoi(args[2]); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| rocksdb::Status s = set_db.Take(args_[1], &members, count_, false); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| |
| private: |
| int count_ = 1; |
| }; |
| |
| class CommandSMove : public Commander { |
| public: |
| CommandSMove() : Commander("smove", 4, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = set_db.Move(args_[1], args_[2], args_[3], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSDiff : public Commander { |
| public: |
| CommandSDiff() : Commander("sdiff", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<Slice> keys; |
| for (size_t i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| std::vector<std::string> members; |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.Diff(keys, &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSUnion : public Commander { |
| public: |
| CommandSUnion() : Commander("sunion", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<Slice> keys; |
| for (size_t i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| std::vector<std::string> members; |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.Union(keys, &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSInter : public Commander { |
| public: |
| CommandSInter() : Commander("sinter", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<Slice> keys; |
| for (size_t i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| std::vector<std::string> members; |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.Inter(keys, &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSDiffStore: public Commander { |
| public: |
| CommandSDiffStore() : Commander("sdiffstore", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret = 0; |
| std::vector<Slice> keys; |
| for (size_t i = 2; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.DiffStore(args_[1], keys, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSUnionStore: public Commander { |
| public: |
| CommandSUnionStore() : Commander("sunionstore", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret = 0; |
| std::vector<Slice> keys; |
| for (size_t i = 2; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.UnionStore(args_[1], keys, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSInterStore: public Commander { |
| public: |
| CommandSInterStore() : Commander("sinterstore", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret = 0; |
| std::vector<Slice> keys; |
| for (size_t i = 2; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.InterStore(args_[1], keys, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZAdd : public Commander { |
| public: |
| CommandZAdd() : Commander("zadd", -4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 0) { |
| return Status(Status::RedisParseErr, "syntax error"); |
| } |
| |
| try { |
| for (unsigned i = 2; i < args.size(); i += 2) { |
| double score = std::stod(args[i]); |
| member_scores_.emplace_back(MemberScore{args[i + 1], score}); |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "ERR value is not a valid float"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Add(args_[1], 0, &member_scores_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<MemberScore> member_scores_; |
| }; |
| |
| class CommandZCount : public Commander { |
| public: |
| CommandZCount() : Commander("zcount", 4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Count(args_[1], spec_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeSpec spec_; |
| }; |
| |
| class CommandZCard : public Commander { |
| public: |
| CommandZCard() : Commander("zcard", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Card(args_[1], &ret); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZIncrBy : public Commander { |
| public: |
| CommandZIncrBy() : Commander("zincrby", 4, true) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| incr_ = std::stod(args[2]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "value is not an double or out of range"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| double score; |
| |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.IncrBy(args_[1], args_[3], incr_, &score); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::BulkString(std::to_string(score)); |
| return Status::OK(); |
| } |
| |
| private: |
| double incr_ = 0.0; |
| }; |
| |
| class CommandZLexCount : public Commander { |
| public: |
| CommandZLexCount() : Commander("zlexcount", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.RangeByLex(args_[1], spec_, nullptr, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeLexSpec spec_; |
| }; |
| |
| class CommandZPop : public Commander { |
| public: |
| explicit CommandZPop(bool min) : Commander("zpop", -2, true), min_(min) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() > 2) { |
| try { |
| count_ = std::stoi(args[2]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<MemberScore> memeber_scores; |
| rocksdb::Status s = zset_db.Pop(args_[1], count_, min_, &memeber_scores); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| output->append(Redis::MultiLen(memeber_scores.size() * 2)); |
| for (const auto ms : memeber_scores) { |
| output->append(Redis::BulkString(ms.member)); |
| output->append(Redis::BulkString(std::to_string(ms.score))); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| bool min_; |
| int count_ = 1; |
| }; |
| |
| class CommandZPopMin : public CommandZPop { |
| public: |
| CommandZPopMin() : CommandZPop(true) { name_ = "zpopmin"; } |
| }; |
| |
| class CommandZPopMax : public CommandZPop { |
| public: |
| CommandZPopMax() : CommandZPop(false) { name_ = "zpopmax"; } |
| }; |
| |
| class CommandZRange : public Commander { |
| public: |
| explicit CommandZRange(bool reversed = false) |
| : Commander("zrange", -4, false), reversed_(reversed) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| if (args.size() > 4 && (Util::ToLower(args[4]) == "withscores")) { |
| with_scores_ = true; |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<MemberScore> memeber_scores; |
| uint8_t flags = !reversed_ ? 0 : ZSET_REVERSED; |
| rocksdb::Status s = |
| zset_db.Range(args_[1], start_, stop_, flags, &memeber_scores); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (!with_scores_) { |
| output->append(Redis::MultiLen(memeber_scores.size())); |
| } else { |
| output->append(Redis::MultiLen(memeber_scores.size() * 2)); |
| } |
| for (const auto ms : memeber_scores) { |
| output->append(Redis::BulkString(ms.member)); |
| if (with_scores_) |
| output->append(Redis::BulkString(std::to_string(ms.score))); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0; |
| int stop_ = 0; |
| bool reversed_; |
| bool with_scores_ = false; |
| }; |
| |
| class CommandZRevRange : public CommandZRange { |
| public: |
| CommandZRevRange() : CommandZRange(true) { name_ = "zrevrange"; } |
| }; |
| |
| class CommandZRangeByLex : public Commander { |
| public: |
| CommandZRangeByLex() : Commander("zrangebylex", -4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| try { |
| if (args.size() == 7 && Util::ToLower(args[4]) == "limit") { |
| spec_.offset = std::stoi(args[5]); |
| spec_.count = std::stoi(args[6]); |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| rocksdb::Status s = zset_db.RangeByLex(args_[1], spec_, &members, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeLexSpec spec_; |
| }; |
| |
| class CommandZRangeByScore : public Commander { |
| public: |
| explicit CommandZRangeByScore(bool reversed = false) : Commander("zrangebyscore", -4, false) { |
| spec_.reversed = reversed; |
| } |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s; |
| if (spec_.reversed) { |
| s = Redis::ZSet::ParseRangeSpec(args[3], args[2], &spec_); |
| } else { |
| s = Redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_); |
| } |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| try { |
| size_t i = 4; |
| while (i < args.size()) { |
| if (Util::ToLower(args[i]) == "withscores") { |
| with_scores_ = true; |
| i++; |
| } else if (Util::ToLower(args[i]) == "limit" && i + 2 < args.size()) { |
| spec_.offset = std::stoi(args[i + 1]); |
| spec_.count = std::stoi(args[i + 2]); |
| i += 3; |
| } else { |
| return Status(Status::RedisParseErr, "syntax error"); |
| } |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<MemberScore> memeber_scores; |
| rocksdb::Status s = |
| zset_db.RangeByScore(args_[1], spec_, &memeber_scores, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (!with_scores_) { |
| output->append(Redis::MultiLen(memeber_scores.size())); |
| } else { |
| output->append(Redis::MultiLen(memeber_scores.size() * 2)); |
| } |
| for (const auto ms : memeber_scores) { |
| output->append(Redis::BulkString(ms.member)); |
| if (with_scores_) |
| output->append(Redis::BulkString(std::to_string(ms.score))); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeSpec spec_; |
| bool with_scores_ = false; |
| }; |
| |
| class CommandZRank : public Commander { |
| public: |
| explicit CommandZRank(bool reversed = false) |
| : Commander("zrank", 3, false), reversed_(reversed) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int rank; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Rank(args_[1], args_[2], reversed_, &rank); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (rank == -1) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::Integer(rank); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| bool reversed_; |
| }; |
| |
| class CommandZRevRank : public CommandZRank { |
| public: |
| CommandZRevRank() : CommandZRank(true) { name_ = "zrevrank"; } |
| }; |
| |
| class CommandZRevRangeByScore : public CommandZRangeByScore { |
| public: |
| CommandZRevRangeByScore() : CommandZRangeByScore(true) { name_ = "zrevrangebyscore"; } |
| }; |
| |
| class CommandZRem : public Commander { |
| public: |
| CommandZRem() : Commander("zrem", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<rocksdb::Slice> members; |
| for (unsigned i = 2; i < args_.size(); i++) { |
| members.emplace_back(args_[i]); |
| } |
| rocksdb::Status s = zset_db.Remove(args_[1], members, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZRemRangeByRank : public Commander { |
| public: |
| CommandZRemRangeByRank() : Commander("zremrangebyrank", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = |
| zset_db.RemoveRangeByRank(args_[1], start_, stop_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0; |
| int stop_ = 0; |
| }; |
| |
| class CommandZRemRangeByScore : public Commander { |
| public: |
| CommandZRemRangeByScore() : Commander("zremrangebyscore", -4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.RemoveRangeByScore(args_[1], spec_, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeSpec spec_; |
| }; |
| |
| class CommandZRemRangeByLex : public Commander { |
| public: |
| CommandZRemRangeByLex() : Commander("zremrangebylex", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.RemoveRangeByLex(args_[1], spec_, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeLexSpec spec_; |
| }; |
| |
| class CommandZScore : public Commander { |
| public: |
| CommandZScore() : Commander("zscore", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| double score; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Score(args_[1], args_[2], &score); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::BulkString(std::to_string(score)); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZUnionStore : public Commander { |
| public: |
| CommandZUnionStore() : Commander("zunionstore", -4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| numkeys_ = std::stoi(args[2]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| if (numkeys_ > args.size() - 3) { |
| return Status(Status::RedisParseErr, "syntax error"); |
| } |
| size_t j = 0; |
| while (j < numkeys_) { |
| keys_weights_.emplace_back(KeyWeight{args[j + 3], 1}); |
| j++; |
| } |
| size_t i = 3 + numkeys_; |
| while (i < args.size()) { |
| if (Util::ToLower(args[i]) == "aggregate" && i + 1 < args.size()) { |
| if (Util::ToLower(args[i + 1]) == "sum") { |
| aggregate_method_ = kAggregateSum; |
| } else if (Util::ToLower(args[i + 1]) == "min") { |
| aggregate_method_ = kAggregateMin; |
| } else if (Util::ToLower(args[i + 1]) == "max") { |
| aggregate_method_ = kAggregateMax; |
| } else { |
| return Status(Status::RedisParseErr, "aggregate para error"); |
| } |
| i += 2; |
| } else if (Util::ToLower(args[i]) == "weights" && i + numkeys_ < args.size()) { |
| size_t j = 0; |
| while (j < numkeys_) { |
| try { |
| keys_weights_[j].weight = std::stod(args[i + j + 1]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "value is not an double or out of range"); |
| } |
| j++; |
| } |
| i += numkeys_ + 1; |
| } else { |
| return Status(Status::RedisParseErr, "syntax error"); |
| } |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.UnionStore(args_[1], keys_weights_, aggregate_method_, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| |
| protected: |
| size_t numkeys_ = 0; |
| std::vector<KeyWeight> keys_weights_; |
| AggregateMethod aggregate_method_ = kAggregateSum; |
| }; |
| |
| class CommandZInterStore : public CommandZUnionStore { |
| public: |
| CommandZInterStore() : CommandZUnionStore() { name_ = "zinterstore"; } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.InterStore(args_[1], keys_weights_, aggregate_method_, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandInfo : public Commander { |
| public: |
| CommandInfo() : Commander("info", -1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string section = "all"; |
| if (args_.size() == 2) { |
| section = Util::ToLower(args_[1]); |
| } |
| std::string info; |
| svr->GetInfo(conn->GetNamespace(), section, &info); |
| *output = Redis::BulkString(info); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandCompact : public Commander { |
| public: |
| CommandCompact() : Commander("compact", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error("only administrator can compact the db"); |
| return Status::OK(); |
| } |
| Status s = svr->AsyncCompactDB(); |
| if (!s.IsOK()) return s; |
| *output = Redis::SimpleString("OK"); |
| LOG(INFO) << "Commpact was triggered by manual with executed success"; |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandBGSave: public Commander { |
| public: |
| CommandBGSave() : Commander("bgsave", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error("only administrator can do bgsave command"); |
| return Status::OK(); |
| } |
| Status s = svr->AsyncBgsaveDB(); |
| if (!s.IsOK()) return s; |
| *output = Redis::SimpleString("OK"); |
| LOG(INFO) << "BGSave was triggered by manual with executed success"; |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandDBSize : public Commander { |
| public: |
| CommandDBSize() : Commander("dbsize", -1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string ns = conn->GetNamespace(); |
| if (args_.size() == 1) { |
| KeyNumStats stats; |
| svr->GetLastestKeyNumStats(ns, &stats); |
| *output = Redis::Integer(stats.n_key); |
| } else if (args_.size() == 2 && args_[1] == "scan") { |
| Status s = svr->AsyncScanDBSize(ns); |
| if (s.IsOK()) { |
| *output = Redis::SimpleString("OK"); |
| } else { |
| *output = Redis::Error(s.Msg()); |
| } |
| } else { |
| *output = Redis::Error("DBSIZE subcommand only supports scan"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPublish : public Commander { |
| public: |
| CommandPublish() : Commander("publish", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::PubSub pubsub_db(svr->storage_); |
| auto s = pubsub_db.Publish(args_[1], args_[2]); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| int receivers = svr->PublishMessage(args_[1], args_[2]); |
| *output = Redis::Integer(receivers); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSubscribe : public Commander { |
| public: |
| CommandSubscribe() : Commander("subcribe", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| for (unsigned i = 1; i < args_.size(); i++) { |
| conn->SubscribeChannel(args_[i]); |
| output->append(Redis::MultiLen(3)); |
| output->append(Redis::BulkString("subscribe")); |
| output->append(Redis::BulkString(args_[i])); |
| output->append(Redis::Integer(conn->SubscriptionsCount())); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandUnSubscribe : public Commander { |
| public: |
| CommandUnSubscribe() : Commander("unsubcribe", -1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (args_.size() > 1) { |
| conn->UnSubscribeChannel(args_[1]); |
| } else { |
| conn->UnSubscribeAll(); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPSubscribe : public Commander { |
| public: |
| CommandPSubscribe() : Commander("psubcribe", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| for (unsigned i = 1; i < args_.size(); i++) { |
| conn->PSubscribeChannel(args_[i]); |
| output->append(Redis::MultiLen(3)); |
| output->append(Redis::BulkString("psubscribe")); |
| output->append(Redis::BulkString(args_[i])); |
| output->append(Redis::Integer(conn->PSubscriptionsCount())); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPUnSubscribe : public Commander { |
| public: |
| CommandPUnSubscribe() : Commander("punsubcribe", -1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (args_.size() > 1) { |
| conn->PUnSubscribeChannel(args_[1]); |
| } else { |
| conn->PUnSubscribeAll(); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPubSub : public Commander { |
| public: |
| CommandPubSub() : Commander("pubsub", -2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| subcommand_ = Util::ToLower(args[1]); |
| if (subcommand_ == "numpat" && args.size() == 2) { |
| return Status::OK(); |
| } |
| if ((subcommand_ == "numsub") && args.size() >= 2) { |
| if (args.size() > 2) { |
| channels_ = std::vector<std::string>(args.begin() + 2, args.end()); |
| } |
| return Status::OK(); |
| } |
| if ((subcommand_ == "channels") && args.size() <= 3) { |
| if (args.size() == 3) { |
| pattern_ = args[2]; |
| } |
| return Status::OK(); |
| } |
| return Status(Status::RedisInvalidCmd, |
| "ERR Unknown subcommand or wrong number of arguments"); |
| } |
| |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| if (subcommand_ == "numpat") { |
| *output = Redis::Integer(srv->GetPubSubPatternSize()); |
| return Status::OK(); |
| } else if (subcommand_ == "numsub") { |
| std::vector<ChannelSubscribeNum> channel_subscribe_nums; |
| srv->ListChannelSubscribeNum(channels_, &channel_subscribe_nums); |
| output->append(Redis::MultiLen(channel_subscribe_nums.size() * 2)); |
| for (const auto chan_subscribe_num : channel_subscribe_nums) { |
| output->append(Redis::BulkString(chan_subscribe_num.channel)); |
| output->append(Redis::Integer(chan_subscribe_num.subscribe_num)); |
| } |
| return Status::OK(); |
| } else if (subcommand_ == "channels") { |
| std::vector<std::string> channels; |
| srv->GetChannelsByPattern(pattern_, &channels); |
| *output = Redis::MultiBulkString(channels); |
| return Status::OK(); |
| } |
| |
| return Status(Status::RedisInvalidCmd, |
| "ERR Unknown subcommand or wrong number of arguments"); |
| } |
| |
| private: |
| std::string pattern_; |
| std::vector<std::string> channels_; |
| std::string subcommand_; |
| }; |
| |
| class CommandSlaveOf : public Commander { |
| public: |
| CommandSlaveOf() : Commander("slaveof", 3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| host_ = args[1]; |
| auto port = args[2]; |
| if (Util::ToLower(host_) == "no" && Util::ToLower(port) == "one") { |
| host_.clear(); |
| return Status::OK(); |
| } |
| try { |
| auto p = std::stoul(port); |
| if (p > UINT32_MAX) { |
| throw std::overflow_error("port out of range"); |
| } |
| port_ = static_cast<uint32_t>(p); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "port should be number"); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error("only administrator can use slaveof command"); |
| return Status::OK(); |
| } |
| Status s; |
| if (host_.empty()) { |
| s = svr->RemoveMaster(); |
| if (s.IsOK()) { |
| *output = Redis::SimpleString("OK"); |
| LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')"; |
| } |
| } else { |
| s = svr->AddMaster(host_, port_); |
| if (s.IsOK()) { |
| *output = Redis::SimpleString("OK"); |
| LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ |
| << " enabled (user request from '" << conn->GetAddr() << "')"; |
| } else { |
| LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ |
| << " (user request from '" << conn->GetAddr() << "') encounter error: " << s.Msg(); |
| } |
| } |
| return s; |
| } |
| |
| private: |
| std::string host_; |
| uint32_t port_ = 0; |
| }; |
| |
| class CommandStats: public Commander { |
| public: |
| CommandStats() : Commander("stats", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string stats_json = svr->GetRocksDBStatsJson(); |
| *output = Redis::BulkString(stats_json); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPSync : public Commander { |
| public: |
| CommandPSync() : Commander("psync", 2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| auto s = std::stoull(args[1]); |
| next_repl_seq = static_cast<rocksdb::SequenceNumber>(s); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "value is not an unsigned long long or out of range"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| LOG(INFO) << "Slave " << conn->GetAddr() << " asks for synchronization" |
| << " with next sequence: " << next_repl_seq |
| << ", and local sequence: " << svr->storage_->LatestSeq(); |
| if (!checkWALBoundary(svr->storage_, next_repl_seq).IsOK()) { |
| svr->stats_.IncrPSyncErrCounter(); |
| *output = "sequence out of range, please use fullsync"; |
| return Status(Status::RedisExecErr, *output); |
| } |
| svr->stats_.IncrPSyncOKCounter(); |
| Status s = svr->AddSlave(conn, next_repl_seq); |
| if (!s.IsOK()) return s; |
| LOG(INFO) << "New slave: " << conn->GetAddr() << " was added, start increment syncing"; |
| conn->EnableFlag(Redis::Connection::kSlave); |
| // server would spawn a new thread to sync the batch, |
| // and connection would be took over, so should never trigger any event in worker thread |
| conn->Detach(); |
| write(conn->GetFD(), "+OK\r\n", 5); |
| return Status::OK(); |
| } |
| |
| private: |
| rocksdb::SequenceNumber next_repl_seq = 0; |
| |
| // Return OK if the seq is in the range of the current WAL |
| Status checkWALBoundary(Engine::Storage *storage, |
| rocksdb::SequenceNumber seq) { |
| if (seq == storage->LatestSeq() + 1) { |
| return Status::OK(); |
| } |
| // Upper bound |
| if (seq > storage->LatestSeq() + 1) { |
| return Status(Status::NotOK); |
| } |
| // Lower bound |
| std::unique_ptr<rocksdb::TransactionLogIterator> iter; |
| auto s = storage->GetWALIter(seq, &iter); |
| if (s.IsOK() && iter->Valid()) { |
| auto batch = iter->GetBatch(); |
| if (seq < batch.sequence) { |
| return Status(Status::NotOK); |
| } |
| return Status::OK(); |
| } |
| return Status(Status::NotOK); |
| } |
| }; |
| |
| class CommandPerfLog : public Commander { |
| public: |
| CommandPerfLog() : Commander("perflog", -2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| subcommand_ = Util::ToLower(args[1]); |
| if (subcommand_ != "reset" && subcommand_ != "get" && subcommand_ != "len") { |
| return Status(Status::NotOK, "PERFLOG subcommand must be one of RESET, LEN, GET"); |
| } |
| if (subcommand_ == "get" && args.size() >= 3) { |
| cnt = std::stoi(args[3]); |
| } |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| auto perf_log = srv->GetPerfLog(); |
| if (subcommand_ == "len") { |
| *output = Redis::Integer(perf_log->Len()); |
| } else if (subcommand_ == "reset") { |
| perf_log->Reset(); |
| *output = Redis::SimpleString("OK"); |
| } else if (subcommand_ == "get") { |
| *output = perf_log->ToString(cnt); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| std::string subcommand_; |
| int cnt = 0; |
| }; |
| |
| class CommandSlowlog : public Commander { |
| public: |
| CommandSlowlog() : Commander("slowlog", -2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| subcommand_ = Util::ToLower(args[1]); |
| if ((subcommand_ == "reset" || subcommand_ == "len" || |
| subcommand_ == "get") && |
| args.size() == 2) { |
| return Status::OK(); |
| } |
| if (subcommand_ == "get" && args.size() == 3) { |
| try { |
| auto c = std::stoul(args[2]); |
| count_ = static_cast<uint32_t>(c); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "value is not an unsigned long or out of range"); |
| } |
| return Status::OK(); |
| } |
| return Status( |
| Status::RedisInvalidCmd, |
| "Unknown SLOWLOG subcommand or wrong # of args. Try GET, RESET, LEN."); |
| } |
| |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| if (subcommand_ == "reset") { |
| srv->SlowlogReset(); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } else if (subcommand_ == "len") { |
| *output = Redis::Integer(srv->SlowlogLen()); |
| return Status::OK(); |
| } else if (subcommand_ == "get") { |
| srv->CreateSlowlogReply(output, count_); |
| return Status::OK(); |
| } |
| return Status( |
| Status::RedisInvalidCmd, |
| "Unknown SLOWLOG subcommand or wrong # of args. Try GET, RESET, LEN."); |
| } |
| |
| private: |
| std::string subcommand_; |
| uint32_t count_ = 10; |
| }; |
| |
| class CommandClient : public Commander { |
| public: |
| CommandClient() : Commander("client", -2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| subcommand_ = Util::ToLower(args[1]); |
| // subcommand: getname id kill list setname |
| if ((subcommand_ == "id" || subcommand_ == "getname" || subcommand_ == "list") && args.size() == 2) { |
| return Status::OK(); |
| } |
| if ((subcommand_ == "setname") && args.size() == 3) { |
| conn_name_ = args[2]; |
| return Status::OK(); |
| } |
| if ((subcommand_ == "kill")) { |
| if (args.size() == 2) { |
| return Status(Status::RedisParseErr, "syntax error"); |
| } else if (args.size() == 3) { |
| addr_ = args[2]; |
| new_format_ = false; |
| return Status::OK(); |
| } |
| |
| uint i = 2; |
| new_format_ = true; |
| while (i < args.size()) { |
| bool moreargs = i < args.size(); |
| if (args[i] == "addr" && moreargs) { |
| addr_ = args[i+1]; |
| } else if (args[i] == "id" && moreargs) { |
| try { |
| id_ = std::stoll(args[i+1]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, kValueNotInterger); |
| } |
| } else if (args[i] == "skipme" && moreargs) { |
| if (args[i+1] == "yes") { |
| skipme_ = true; |
| } else if (args[i+1] == "no") { |
| skipme_ = false; |
| } else { |
| return Status(Status::RedisParseErr, "syntax error"); |
| } |
| } else { |
| return Status(Status::RedisParseErr, "syntax error"); |
| } |
| i += 2; |
| } |
| return Status::OK(); |
| } |
| return Status(Status::RedisInvalidCmd, |
| "Syntax error, try CLIENT LIST|KILL ip:port|GETNAME|SETNAME"); |
| } |
| |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| if (subcommand_ == "list") { |
| *output = Redis::BulkString(srv->GetClientsStr()); |
| return Status::OK(); |
| } else if (subcommand_ == "setname") { |
| conn->SetName(conn_name_); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } else if (subcommand_ == "getname") { |
| std::string name = conn->GetName(); |
| *output = name== ""? Redis::NilString(): Redis::BulkString(name); |
| return Status::OK(); |
| } else if (subcommand_ == "id") { |
| *output = Redis::Integer(conn->GetID()); |
| return Status::OK(); |
| } else if (subcommand_ == "kill") { |
| int64_t killed = 0; |
| srv->KillClient(&killed, addr_, id_, skipme_, conn); |
| if (new_format_) { |
| *output = Redis::Integer(killed); |
| } else { |
| if (killed == 0) |
| *output = Redis::Error("No such client"); |
| else |
| *output = Redis::SimpleString("OK"); |
| } |
| return Status::OK(); |
| } |
| |
| return Status(Status::RedisInvalidCmd, |
| "Syntax error, try CLIENT LIST|KILL ip:port|GETNAME|SETNAME"); |
| } |
| |
| private: |
| std::string addr_; |
| std::string conn_name_; |
| std::string subcommand_; |
| bool skipme_ = false; |
| uint64_t id_ = 0; |
| bool new_format_ = true; |
| }; |
| |
| class CommandMonitor : public Commander { |
| public: |
| CommandMonitor() : Commander("monitor", 1, false) {} |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| conn->Owner()->BecomeMonitorConn(conn); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandShutdown : public Commander { |
| public: |
| CommandShutdown() : Commander("shutdown", -1, false) {} |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error("only administrator can use namespace command"); |
| return Status::OK(); |
| } |
| if (!srv->IsStopped()) { |
| LOG(INFO) << "bye bye"; |
| srv->Stop(); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandQuit : public Commander { |
| public: |
| CommandQuit() : Commander("quit", -1, false) {} |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| conn->EnableFlag(Redis::Connection::kCloseAfterReply); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandScanBase : public Commander { |
| public: |
| explicit CommandScanBase(const std::string &name, int arity, bool is_write = false) |
| : Commander(name, arity, is_write) {} |
| Status ParseMatchAndCountParam(const std::string &type, const std::string &value) { |
| if (type == "match") { |
| prefix = std::move(value); |
| if (!prefix.empty() && prefix[prefix.size() - 1] == '*') { |
| prefix = prefix.substr(0, prefix.size() - 1); |
| return Status::OK(); |
| } |
| return Status(Status::RedisParseErr, "only keys prefix match was supported"); |
| } else if (type == "count") { |
| try { |
| limit = std::stoi(value); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "ERR count param should be type int"); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| void ParseCursor(const std::string ¶m) { |
| cursor = param; |
| if (cursor == "0") { |
| cursor = std::string(); |
| } |
| } |
| |
| std::string GenerateOutput(const std::vector<std::string> &keys) { |
| std::vector<std::string> list; |
| if (!keys.empty()) { |
| list.emplace_back(Redis::BulkString(keys.back())); |
| } else { |
| list.emplace_back(Redis::BulkString("0")); |
| } |
| |
| list.emplace_back(Redis::MultiBulkString(keys)); |
| |
| return Redis::Array(list); |
| } |
| |
| protected: |
| std::string cursor; |
| std::string prefix; |
| int limit = 20; |
| }; |
| |
| class CommandSubkeyScanBase : public CommandScanBase { |
| public: |
| explicit CommandSubkeyScanBase(const std::string &name, int arity, bool is_write = false) |
| : CommandScanBase(name, arity, is_write) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 == 0) { |
| return Status(Status::RedisParseErr, "wrong number of arguments"); |
| } |
| key = args[1]; |
| ParseCursor(args[2]); |
| if (args.size() >= 5) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[3]), args_[4]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| if (args.size() >= 7) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[5]), args_[6]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| return Commander::Parse(args); |
| } |
| |
| protected: |
| std::string key; |
| }; |
| |
| class CommandScan : public CommandScanBase { |
| public: |
| CommandScan() : CommandScanBase("scan", -2, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 0) { |
| return Status(Status::RedisParseErr, "wrong number of arguments"); |
| } |
| |
| ParseCursor(args[1]); |
| if (args.size() >= 4) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[2]), args_[3]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| if (args.size() >= 6) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[4]), args_[5]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> keys; |
| auto s = redis_db.Scan(cursor, limit, prefix, &keys); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| *output = GenerateOutput(keys); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHScan : public CommandSubkeyScanBase { |
| public: |
| CommandHScan() : CommandSubkeyScanBase("hscan", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> fields; |
| auto s = hash_db.Scan(key, cursor, limit, prefix, &fields); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = GenerateOutput(fields); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSScan : public CommandSubkeyScanBase { |
| public: |
| CommandSScan() : CommandSubkeyScanBase("sscan", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| auto s = set_db.Scan(key, cursor, limit, prefix, &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| *output = GenerateOutput(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZScan : public CommandSubkeyScanBase { |
| public: |
| CommandZScan() : CommandSubkeyScanBase("zscan", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| auto s = zset_db.Scan(key, cursor, limit, prefix, &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| *output = GenerateOutput(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandRandomKey : public Commander { |
| public: |
| CommandRandomKey() : Commander("randomkey", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string key; |
| auto cursor = svr->GetLastRandomKeyCursor(); |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| redis.RandomKey(cursor, &key); |
| svr->SetLastRandomKeyCursor(key); |
| *output = Redis::BulkString(key); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandReplConf : public Commander { |
| public: |
| CommandReplConf() : Commander("replconf", -3, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 == 0) { |
| return Status(Status::RedisParseErr, "wrong number of arguments"); |
| } |
| if (args.size() >= 3) { |
| Status s = ParseParam(Util::ToLower(args[1]), args_[2]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| if (args.size() >= 5) { |
| Status s = ParseParam(Util::ToLower(args[3]), args_[4]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status ParseParam(const std::string &option, const std::string &value) { |
| if (option == "listening-port") { |
| try { |
| auto p = std::stoul(value); |
| if (p > UINT32_MAX) { |
| throw std::overflow_error("listening-port out of range"); |
| } |
| port_ = static_cast<uint32_t>(p); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "listening-port should be number"); |
| } |
| } else { |
| return Status(Status::RedisParseErr, "unknown option"); |
| } |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (port_ != 0) { |
| conn->SetListeningPort(port_); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| |
| private: |
| uint32_t port_ = 0; |
| }; |
| |
| class CommandFetchMeta : public Commander { |
| public: |
| CommandFetchMeta() : Commander("_fetch_meta", 1, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint64_t file_size; |
| rocksdb::BackupID meta_id; |
| int fd; |
| auto s = Engine::Storage::BackupManager::OpenLatestMeta( |
| svr->storage_, &fd, &meta_id, &file_size); |
| if (!s.IsOK()) { |
| LOG(ERROR) << "Failed to open latest meta, err: " << s.Msg(); |
| return Status(Status::DBBackupFileErr, "can't create db backup"); |
| } |
| // Send the meta ID |
| conn->Reply(std::to_string(meta_id) + CRLF); |
| // Send meta file size |
| conn->Reply(std::to_string(file_size) + CRLF); |
| // Send meta content |
| conn->SendFile(fd); |
| svr->stats_.IncrFullSyncCounter(); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandFetchFile : public Commander { |
| public: |
| CommandFetchFile() : Commander("_fetch_file", 2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| path_ = args[1]; |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint64_t file_size = 0; |
| auto fd = Engine::Storage::BackupManager::OpenDataFile(svr->storage_, path_, |
| &file_size); |
| if (fd < 0) return Status(Status::DBBackupFileErr); |
| conn->Reply(std::to_string(file_size) + CRLF); |
| conn->SendFile(fd); |
| return Status::OK(); |
| } |
| |
| private: |
| std::string path_; |
| }; |
| |
| class CommandDBName : public Commander { |
| public: |
| CommandDBName() : Commander("_db_name", 1, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| conn->Reply(svr->storage_->GetName() + CRLF); |
| return Status::OK(); |
| } |
| }; |
| |
| using CommanderFactory = std::function<std::unique_ptr<Commander>()>; |
| std::map<std::string, CommanderFactory> command_table = { |
| {"auth", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandAuth); |
| }}, |
| {"ping", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPing); |
| }}, |
| {"select", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSelect); |
| }}, |
| {"info", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandInfo); |
| }}, |
| {"config", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandConfig); |
| }}, |
| {"namespace", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandNamespace); |
| }}, |
| {"keys", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandKeys); |
| }}, |
| {"flushdb", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandFlushDB); |
| }}, |
| {"dbsize", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandDBSize); |
| }}, |
| {"slowlog", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSlowlog); |
| }}, |
| {"perflog", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPerfLog); |
| }}, |
| {"client", |
| []()->std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandClient); |
| }}, |
| {"monitor", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandMonitor); |
| }}, |
| {"shutdown", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandShutdown); |
| }}, |
| {"quit", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandQuit); |
| }}, |
| {"scan", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandScan); |
| }}, |
| {"randomkey", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandRandomKey); |
| }}, |
| // key command |
| {"ttl", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandTTL); |
| }}, |
| {"pttl", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPTTL); |
| }}, |
| {"type", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandType); |
| }}, |
| {"object", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandObject); |
| }}, |
| {"exists", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandExists); |
| }}, |
| {"persist", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPersist); |
| }}, |
| {"expire", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandExpire); |
| }}, |
| {"pexpire", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPExpire); |
| }}, |
| {"expireat", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandExpireAt); |
| }}, |
| {"pexpireat", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPExpireAt); |
| }}, |
| {"del", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandDel); |
| }}, |
| // string command |
| {"get", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandGet); |
| }}, |
| {"strlen", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandStrlen); |
| }}, |
| {"getset", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandGetSet); |
| }}, |
| {"getrange", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandGetRange); |
| }}, |
| {"setrange", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSetRange); |
| }}, |
| {"mget", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandMGet); |
| }}, |
| {"append", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandAppend); |
| }}, |
| {"set", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSet); |
| }}, |
| {"setex", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSetEX); |
| }}, |
| {"setnx", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSetNX); |
| }}, |
| {"mset", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandMSet); |
| }}, |
| {"incrby", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandIncrBy); |
| }}, |
| {"incrbyfloat", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandIncrByFloat); |
| }}, |
| {"incr", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandIncr); |
| }}, |
| {"decrby", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandDecrBy); |
| }}, |
| {"decr", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandDecr); |
| }}, |
| // bit command |
| {"getbit", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandGetBit); |
| }}, |
| {"setbit", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSetBit); |
| }}, |
| {"bitcount", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandBitCount); |
| }}, |
| {"bitpos", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandBitPos); |
| }}, |
| // hash command |
| {"hget", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHGet); |
| }}, |
| {"hincrby", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHIncrBy); |
| }}, |
| {"hincrbyfloat", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHIncrByFloat); |
| }}, |
| {"hset", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHSet); |
| }}, |
| {"hsetnx", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHSetNX); |
| }}, |
| {"hdel", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHDel); |
| }}, |
| {"hstrlen", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHStrlen); |
| }}, |
| {"hexists", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHExists); |
| }}, |
| {"hlen", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHLen); |
| }}, |
| {"hmget", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHMGet); |
| }}, |
| {"hmset", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHMSet); |
| }}, |
| {"hkeys", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHKeys); |
| }}, |
| {"hvals", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHVals); |
| }}, |
| {"hgetall", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHGetAll); |
| }}, |
| {"hscan", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandHScan); |
| }}, |
| // list command |
| {"lpush", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLPush); |
| }}, |
| {"rpush", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandRPush); |
| }}, |
| {"lpushx", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLPushX); |
| }}, |
| {"rpushx", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandRPushX); |
| }}, |
| {"lpop", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLPop); |
| }}, |
| {"rpop", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandRPop); |
| }}, |
| {"blpop", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandBLPop); |
| }}, |
| {"brpop", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandBRPop); |
| }}, |
| {"lrem", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLRem); |
| }}, |
| {"linsert", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLInsert); |
| }}, |
| {"lrange", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLRange); |
| }}, |
| {"lindex", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLIndex); |
| }}, |
| {"ltrim", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLTrim); |
| }}, |
| {"llen", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLLen); |
| }}, |
| {"lset", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandLSet); |
| }}, |
| {"rpoplpush", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandRPopLPUSH); |
| }}, |
| // set command |
| {"sadd", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSAdd); |
| }}, |
| {"srem", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSRem); |
| }}, |
| {"scard", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSCard); |
| }}, |
| {"smembers", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSMembers); |
| }}, |
| {"sismember", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSIsMember); |
| }}, |
| {"spop", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSPop); |
| }}, |
| {"srandmember", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSRandMember); |
| }}, |
| {"smove", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSMove); |
| }}, |
| {"sdiff", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSDiff); |
| }}, |
| {"sunion", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSUnion); |
| }}, |
| {"sinter", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSInter); |
| }}, |
| {"sdiffstore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSDiffStore); |
| }}, |
| {"sunionstore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSUnionStore); |
| }}, |
| {"sinterstore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSInterStore); |
| }}, |
| {"sscan", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSScan); |
| }}, |
| // zset command |
| {"zadd", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZAdd); |
| }}, |
| {"zcard", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZCard); |
| }}, |
| {"zcount", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZCount); |
| }}, |
| {"zincrby", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZIncrBy); |
| }}, |
| {"zinterstore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZInterStore); |
| }}, |
| {"zlexcount", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZLexCount); |
| }}, |
| {"zpopmax", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZPopMax); |
| }}, |
| {"zpopmin", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZPopMin); |
| }}, |
| {"zrange", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRange); |
| }}, |
| {"zrevrange", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRevRange); |
| }}, |
| {"zrangebylex", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRangeByLex); |
| }}, |
| {"zrangebyscore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRangeByScore); |
| }}, |
| {"zrank", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRank); |
| }}, |
| {"zrem", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRem); |
| }}, |
| {"zremrangebyrank", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRemRangeByRank); |
| }}, |
| {"zremrangebyscore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRemRangeByScore); |
| }}, |
| {"zremrangebylex", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRemRangeByLex); |
| }}, |
| {"zrevrangebyscore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRevRangeByScore); |
| }}, |
| {"zrevrank", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZRevRank); |
| }}, |
| {"zscore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZScore); |
| }}, |
| {"zscan", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZScan); |
| }}, |
| {"zunionstore", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandZUnionStore); |
| }}, |
| // pub/sub command |
| {"publish", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPublish); |
| }}, |
| {"subscribe", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSubscribe); |
| }}, |
| {"unsubscribe", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandUnSubscribe); |
| }}, |
| {"psubscribe", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPSubscribe); |
| }}, |
| {"punsubscribe", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPUnSubscribe); |
| }}, |
| {"pubsub", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPubSub); |
| }}, |
| |
| // internal management cmd |
| {"compact", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandCompact); |
| }}, |
| {"bgsave", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandBGSave); |
| }}, |
| {"slaveof", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandSlaveOf); |
| }}, |
| {"stats", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandStats); |
| }}, |
| }; |
| |
| // Replication related commands, which are received by workers listening on |
| // `repl-port` |
| std::map<std::string, CommanderFactory> repl_command_table = { |
| {"auth", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandAuth); |
| }}, |
| {"replconf", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandReplConf); |
| }}, |
| {"psync", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandPSync); |
| }}, |
| {"_fetch_meta", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandFetchMeta); |
| }}, |
| {"_fetch_file", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandFetchFile); |
| }}, |
| {"_db_name", |
| []() -> std::unique_ptr<Commander> { |
| return std::unique_ptr<Commander>(new CommandDBName); |
| }}, |
| }; |
| |
| Status LookupCommand(const std::string &cmd_name, |
| std::unique_ptr<Commander> *cmd, bool is_repl) { |
| if (cmd_name.empty()) return Status(Status::RedisUnknownCmd); |
| if (is_repl) { |
| auto cmd_factory = repl_command_table.find(Util::ToLower(cmd_name)); |
| if (cmd_factory == repl_command_table.end()) { |
| return Status(Status::RedisUnknownCmd); |
| } |
| *cmd = cmd_factory->second(); |
| } else { |
| auto cmd_factory = command_table.find(Util::ToLower(cmd_name)); |
| if (cmd_factory == command_table.end()) { |
| return Status(Status::RedisUnknownCmd); |
| } |
| *cmd = cmd_factory->second(); |
| } |
| return Status::OK(); |
| } |
| |
| bool IsCommandExists(const std::string &cmd) { |
| return command_table.find(cmd) != command_table.end(); |
| } |
| |
| void GetCommandList(std::vector<std::string> *cmds) { |
| cmds->clear(); |
| for (const auto &cmd : command_table) { |
| cmds->emplace_back(cmd.first); |
| } |
| for (const auto &cmd : repl_command_table) { |
| cmds->emplace_back(cmd.first); |
| } |
| } |
| } // namespace Redis |