| #include <arpa/inet.h> |
| #include <fcntl.h> |
| #include <glog/logging.h> |
| #include <sys/socket.h> |
| #include <algorithm> |
| #include <cctype> |
| #include <chrono> |
| #include <thread> |
| #include <utility> |
| #include <memory> |
| |
| #include "redis_db.h" |
| #include "redis_cmd.h" |
| #include "redis_hash.h" |
| #include "redis_bitmap.h" |
| #include "redis_list.h" |
| #include "redis_request.h" |
| #include "redis_connection.h" |
| #include "redis_set.h" |
| #include "redis_string.h" |
| #include "redis_zset.h" |
| #include "redis_geo.h" |
| #include "redis_pubsub.h" |
| #include "redis_sortedint.h" |
| #include "redis_slot.h" |
| #include "replication.h" |
| #include "util.h" |
| #include "storage.h" |
| #include "worker.h" |
| #include "server.h" |
| #include "log_collector.h" |
| |
| namespace Redis { |
| |
| const char *errInvalidSyntax = "syntax error"; |
| const char *errWrongNumOfArguments = "wrong number of arguments"; |
| const char *errValueNotInterger = "value is not an integer or out of range"; |
| const char *errAdministorPermissionRequired = "administor permission required to perform the command"; |
| |
| class CommandAuth : public Commander { |
| public: |
| CommandAuth() : Commander("auth", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Config *config = svr->GetConfig(); |
| auto user_password = args_[1]; |
| if (!conn->IsRepl()) { |
| auto iter = config->tokens.find(user_password); |
| if (iter != config->tokens.end()) { |
| conn->SetNamespace(iter->second); |
| conn->BecomeUser(); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| } |
| const auto requirepass = conn->IsRepl() ? config->masterauth : config->requirepass; |
| if (!requirepass.empty() && user_password != requirepass) { |
| *output = Redis::Error("ERR invaild password"); |
| return Status::OK(); |
| } |
| conn->SetNamespace(kDefaultNamespace); |
| conn->BecomeAdmin(); |
| if (requirepass.empty()) { |
| *output = Redis::Error("ERR Client sent AUTH, but no password is set"); |
| } else { |
| *output = Redis::SimpleString("OK"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandNamespace : public Commander { |
| public: |
| CommandNamespace() : Commander("namespace", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error(errAdministorPermissionRequired); |
| return Status::OK(); |
| } |
| Config *config = svr->GetConfig(); |
| if (args_.size() == 3 && args_[1] == "get") { |
| if (args_[2] == "*") { |
| std::vector<std::string> namespaces; |
| auto tokens = config->tokens; |
| for (auto iter = tokens.begin(); iter != tokens.end(); iter++) { |
| namespaces.emplace_back(iter->second); // namespace |
| namespaces.emplace_back(iter->first); // token |
| } |
| *output = Redis::MultiBulkString(namespaces); |
| } else { |
| std::string token; |
| config->GetNamespace(args_[2], &token); |
| *output = Redis::BulkString(token); |
| } |
| } else if (args_.size() == 4 && args_[1] == "set") { |
| Status s = config->SetNamespace(args_[2], args_[3]); |
| *output = s.IsOK() ? Redis::SimpleString("OK") : Redis::Error(s.Msg()); |
| LOG(WARNING) << "Updated namespace: " << args_[2] << " with token: " << args_[3] |
| << ", addr: " << conn->GetAddr() << ", result: " << s.Msg(); |
| } else if (args_.size() == 4 && args_[1] == "add") { |
| Status s = config->AddNamespace(args_[2], args_[3]); |
| *output = s.IsOK() ? Redis::SimpleString("OK") : Redis::Error(s.Msg()); |
| LOG(WARNING) << "New namespace: " << args_[2] << " with token: " << args_[3] |
| << ", addr: " << conn->GetAddr() << ", result: " << s.Msg(); |
| } else if (args_.size() == 3 && args_[1] == "del") { |
| Status s = config->DelNamespace(args_[2]); |
| *output = s.IsOK() ? Redis::SimpleString("OK") : Redis::Error(s.Msg()); |
| LOG(WARNING) << "Deleted namespace: " << args_[2] |
| << ", addr: " << conn->GetAddr() << ", result: " << s.Msg(); |
| } else { |
| *output = Redis::Error( |
| "NAMESPACE subcommand must be one of GET, SET, DEL, ADD"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandKeys : public Commander { |
| public: |
| CommandKeys() : Commander("keys", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string prefix = args_[1]; |
| std::vector<std::string> keys; |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| if (prefix == "*") { |
| redis.Keys(std::string(), &keys); |
| } else { |
| if (prefix[prefix.size() - 1] != '*') { |
| *output = Redis::Error("ERR only keys prefix match was supported"); |
| return Status::OK(); |
| } |
| redis.Keys(prefix.substr(0, prefix.size() - 1), &keys); |
| } |
| *output = Redis::MultiBulkString(keys); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandFlushDB : public Commander { |
| public: |
| CommandFlushDB() : Commander("flushdb", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.FlushDB(); |
| LOG(WARNING) << "DB keys in namespce: " << conn->GetNamespace() |
| << " was flused, addr: " << conn->GetAddr(); |
| if (s.ok()) { |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| }; |
| |
| class CommandFlushAll : public Commander { |
| public: |
| CommandFlushAll() : Commander("flushall", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error(errAdministorPermissionRequired); |
| return Status::OK(); |
| } |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.FlushAll(); |
| LOG(WARNING) << "All DB keys was flushed, addr: " << conn->GetAddr(); |
| if (s.ok()) { |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| }; |
| |
| class CommandPing : public Commander { |
| public: |
| CommandPing() : Commander("ping", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| *output = Redis::SimpleString("PONG"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSelect: public Commander { |
| public: |
| CommandSelect() : Commander("select", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandConfig : public Commander { |
| public: |
| CommandConfig() : Commander("config", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error(errAdministorPermissionRequired); |
| return Status::OK(); |
| } |
| Config *config = svr->GetConfig(); |
| std::string sub_command = Util::ToLower(args_[1]); |
| if ((sub_command == "rewrite" && args_.size() != 2) || |
| (sub_command == "get" && args_.size() != 3) || |
| (sub_command == "set" && args_.size() != 4)) { |
| *output = Redis::Error(errWrongNumOfArguments); |
| return Status::OK(); |
| } |
| if (args_.size() == 2 && sub_command == "rewrite") { |
| Status s = config->Rewrite(); |
| if (!s.IsOK()) return Status(Status::RedisExecErr, s.Msg()); |
| *output = Redis::SimpleString("OK"); |
| LOG(INFO) << "# CONFIG REWRITE executed with success"; |
| } else if (args_.size() == 3 && sub_command == "get") { |
| std::vector<std::string> values; |
| config->Get(args_[2], &values); |
| *output = Redis::MultiBulkString(values); |
| } else if (args_.size() == 4 && sub_command == "set") { |
| Status s = config->Set(svr, args_[2], args_[3]); |
| if (!s.IsOK()) { |
| *output = Redis::Error("CONFIG SET '"+args_[2]+"' error: "+s.Msg()); |
| } else { |
| *output = Redis::SimpleString("OK"); |
| } |
| } else { |
| *output = Redis::Error("CONFIG subcommand must be one of GET, SET, REWRITE"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGet : public Commander { |
| public: |
| CommandGet() : Commander("get", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string value; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.Get(args_[1], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::NilString() : Redis::BulkString(value); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandStrlen: public Commander { |
| public: |
| CommandStrlen() : Commander("strlen", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string value; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.Get(args_[1], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::Integer(0); |
| } else { |
| *output = Redis::Integer(value.size()); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGetSet : public Commander { |
| public: |
| CommandGetSet() : Commander("getset", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| std::string old_value; |
| rocksdb::Status s = string_db.GetSet(args_[1], args_[2], &old_value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::BulkString(old_value); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGetRange: public Commander { |
| public: |
| CommandGetRange() : Commander("getrange", 4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string value; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.Get(args_[1], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::NilString(); |
| return Status::OK(); |
| } |
| if (start_ < 0) start_ = static_cast<int>(value.size()) + start_; |
| if (stop_ < 0) stop_ = static_cast<int>(value.size()) + stop_; |
| if (start_ < 0) start_ = 0; |
| if (stop_ > static_cast<int>(value.size())) stop_ = static_cast<int>(value.size()); |
| if (start_ > stop_) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::BulkString(value.substr(start_, stop_+1)); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0, stop_ = 0; |
| }; |
| |
| class CommandSetRange: public Commander { |
| public: |
| CommandSetRange() : Commander("setrange", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| offset_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.SetRange(args_[1], offset_, args_[3], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int offset_ = 0; |
| }; |
| |
| class CommandMGet : public Commander { |
| public: |
| CommandMGet() : Commander("mget", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> keys; |
| for (size_t i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| std::vector<std::string> values; |
| // always return OK |
| string_db.MGet(keys, &values); |
| *output = Redis::MultiBulkString(values); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandAppend: public Commander { |
| public: |
| CommandAppend() : Commander("append", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.Append(args_[1], args_[2], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSet : public Commander { |
| public: |
| CommandSet() : Commander("set", -3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| bool last_arg; |
| for (size_t i = 3; i < args.size(); i++) { |
| last_arg = (i == args.size()-1); |
| std::string opt = Util::ToLower(args[i]); |
| if (opt == "nx") { |
| nx_ = true; |
| } else if (opt == "xx") { |
| xx_ = true; |
| } else if (opt == "ex") { |
| if (last_arg) return Status(Status::NotOK, errInvalidSyntax); |
| ttl_ = atoi(args_[++i].c_str()); |
| } else if (opt == "px") { |
| if (last_arg) return Status(Status::NotOK, errInvalidSyntax); |
| auto ttl_ms = atol(args[++i].c_str()); |
| if (ttl_ms > 0 && ttl_ms < 1000) { |
| // round up the pttl to second |
| ttl_ = 1; |
| } else { |
| ttl_ = static_cast<int>(ttl_ms/1000); |
| } |
| } else { |
| return Status(Status::NotOK, errInvalidSyntax); |
| } |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s; |
| if (nx_) { |
| s = string_db.SetNX(args_[1], args_[2], ttl_, &ret); |
| } else if (xx_) { |
| s = string_db.SetXX(args_[1], args_[2], ttl_, &ret); |
| } else { |
| s = string_db.SetEX(args_[1], args_[2], ttl_); |
| } |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if ((nx_ || xx_) && !ret) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::SimpleString("OK"); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| bool xx_ = false; |
| bool nx_ = false; |
| int ttl_ = 0; |
| }; |
| |
| class CommandSetEX : public Commander { |
| public: |
| CommandSetEX() : Commander("setex", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| ttl_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.SetEX(args_[1], args_[3], ttl_); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| |
| private: |
| int ttl_ = 0; |
| }; |
| |
| class CommandMSet : public Commander { |
| public: |
| CommandMSet() : Commander("mset", -3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 1) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| std::vector<StringPair> kvs; |
| for (size_t i = 1; i < args_.size(); i+=2) { |
| kvs.emplace_back(StringPair{args_[i], args_[i+1]}); |
| } |
| rocksdb::Status s = string_db.MSet(kvs); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSetNX : public Commander { |
| public: |
| CommandSetNX() : Commander("setnx", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.SetNX(args_[1], args_[2], 0, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandIncr : public Commander { |
| public: |
| CommandIncr() : Commander("incr", 2, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrBy(args_[1], 1, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandDecr : public Commander { |
| public: |
| CommandDecr() : Commander("decr", 2, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrBy(args_[1], -1, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandIncrBy : public Commander { |
| public: |
| CommandIncrBy() : Commander("incrby", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stoll(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrBy(args_[1], increment_, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int64_t increment_ = 0; |
| }; |
| |
| class CommandIncrByFloat : public Commander { |
| public: |
| CommandIncrByFloat() : Commander("incrbyfloat", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stof(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| float ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrByFloat(args_[1], increment_, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::BulkString(std::to_string(ret)); |
| return Status::OK(); |
| } |
| |
| private: |
| float increment_ = 0; |
| }; |
| |
| class CommandDecrBy : public Commander { |
| public: |
| CommandDecrBy() : Commander("decrby", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stoll(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::String string_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = string_db.IncrBy(args_[1], -1 * increment_, &ret); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int64_t increment_ = 0; |
| }; |
| |
| class CommandDel : public Commander { |
| public: |
| CommandDel() : Commander("del", -2, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int cnt = 0; |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| for (unsigned int i = 1; i < args_.size(); i++) { |
| rocksdb::Status s = redis.Del(args_[i]); |
| if (s.ok()) cnt++; |
| } |
| *output = Redis::Integer(cnt); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGetBit : public Commander { |
| public: |
| CommandGetBit() : Commander("getbit", 3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| offset_ = std::stoul(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| bool bit; |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = bitmap_db.GetBit(args_[1], offset_, &bit); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(bit? 1 : 0); |
| return Status::OK(); |
| } |
| private: |
| uint32_t offset_ = 0; |
| }; |
| |
| class CommandSetBit : public Commander { |
| public: |
| CommandSetBit() : Commander("setbit", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| offset_ = std::stoul(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| if (args[3] == "0") { |
| bit_ = false; |
| } else if (args[3] == "1") { |
| bit_ = true; |
| } else { |
| return Status(Status::RedisParseErr, "bit should be 0 or 1"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| bool old_bit; |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = bitmap_db.SetBit(args_[1], offset_, bit_, &old_bit); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(old_bit? 1 : 0); |
| return Status::OK(); |
| } |
| |
| private: |
| uint32_t offset_ = 0; |
| bool bit_ = false; |
| }; |
| |
| class CommandMSetBit : public Commander { |
| public: |
| CommandMSetBit() : Commander("msetbit", -4, true) {} |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| std::vector<BitmapPair> kvs; |
| uint32_t index; |
| for (size_t i = 2; i < args_.size(); i += 2) { |
| try { |
| index = std::stoi(args_[i]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| kvs.emplace_back(BitmapPair{index, args_[i + 1]}); |
| } |
| rocksdb::Status s = bitmap_db.MSetBit(args_[1], kvs); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandBitCount : public Commander { |
| public: |
| CommandBitCount() : Commander("bitcount", -2, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| if (args.size() >= 3) start_ = std::stoi(args[2]); |
| if (args.size() >= 4) stop_ = std::stoi(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint32_t cnt; |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = bitmap_db.BitCount(args_[1], start_, stop_, &cnt); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(cnt); |
| return Status::OK(); |
| } |
| private: |
| int start_ = 0, stop_ = -1; |
| }; |
| |
| class CommandBitPos: public Commander { |
| public: |
| CommandBitPos() : Commander("bitcount", -3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| if (args.size() >= 4) start_ = std::stoi(args[3]); |
| if (args.size() >= 5) stop_ = std::stoi(args[4]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| if (args[2] == "0") { |
| bit_ = false; |
| } else if (args[2] == "1") { |
| bit_ = true; |
| } else { |
| return Status(Status::RedisParseErr, "bit should be 0 or 1"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int pos; |
| Redis::Bitmap bitmap_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = bitmap_db.BitPos(args_[1], bit_, start_, stop_, &pos); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(pos); |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0, stop_ = -1; |
| bool bit_ = false; |
| }; |
| |
| class CommandType : public Commander { |
| public: |
| CommandType() : Commander("type", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| RedisType type; |
| rocksdb::Status s = redis.Type(args_[1], &type); |
| if (s.ok()) { |
| *output = Redis::BulkString(RedisTypeNames[type]); |
| return Status::OK(); |
| } |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| }; |
| |
| class CommandObject : public Commander { |
| public: |
| CommandObject() : Commander("object", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (Util::ToLower(args_[1]) == "dump") { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> infos; |
| rocksdb::Status s = redis.Dump(args_[2], &infos); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| output->append(Redis::MultiLen(infos.size())); |
| for (const auto info : infos) { |
| output->append(Redis::BulkString(info)); |
| } |
| } else { |
| *output = Redis::Error("object subcommand must be dump"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandTTL : public Commander { |
| public: |
| CommandTTL() : Commander("ttl", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| int ttl; |
| rocksdb::Status s = redis.TTL(args_[1], &ttl); |
| if (s.ok()) { |
| *output = Redis::Integer(ttl); |
| return Status::OK(); |
| } else { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| } |
| }; |
| |
| class CommandPTTL : public Commander { |
| public: |
| CommandPTTL() : Commander("pttl", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| int ttl; |
| rocksdb::Status s = redis.TTL(args_[1], &ttl); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| if (ttl > 0) { |
| *output = Redis::Integer(ttl*1000); |
| } else { |
| *output = Redis::Integer(ttl); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandExists : public Commander { |
| public: |
| CommandExists() : Commander("exists", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int cnt = 0; |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| std::vector<rocksdb::Slice> keys; |
| for (unsigned i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| redis.Exists(keys, &cnt); |
| *output = Redis::Integer(cnt); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandExpire : public Commander { |
| public: |
| CommandExpire() : Commander("expire", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| int64_t now; |
| rocksdb::Env::Default()->GetCurrentTime(&now); |
| try { |
| seconds_ = std::stoi(args[2]); |
| if (seconds_ >= INT32_MAX - now) { |
| return Status(Status::RedisParseErr, "the expire time was overflow"); |
| } |
| seconds_ += now; |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.Expire(args_[1], seconds_); |
| if (s.ok()) { |
| *output = Redis::Integer(1); |
| } else { |
| *output = Redis::Integer(0); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int seconds_ = 0; |
| }; |
| |
| class CommandPExpire : public Commander { |
| public: |
| CommandPExpire() : Commander("pexpire", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| int64_t now; |
| rocksdb::Env::Default()->GetCurrentTime(&now); |
| try { |
| auto ttl_ms = std::stol(args[2]); |
| if (ttl_ms > 0 && ttl_ms < 1000) { |
| seconds_ = 1; |
| } else { |
| seconds_ = ttl_ms / 1000; |
| if (seconds_ >= INT32_MAX - now) { |
| return Status(Status::RedisParseErr, "the expire time was overflow"); |
| } |
| } |
| seconds_ += now; |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.Expire(args_[1], seconds_); |
| if (s.ok()) { |
| *output = Redis::Integer(1); |
| } else { |
| *output = Redis::Integer(0); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int seconds_ = 0; |
| }; |
| |
| class CommandExpireAt : public Commander { |
| public: |
| CommandExpireAt() : Commander("expireat", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| timestamp_ = std::stoi(args[2]); |
| if (timestamp_ >= INT32_MAX) { |
| return Status(Status::RedisParseErr, "the expire time was overflow"); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.Expire(args_[1], timestamp_); |
| if (s.ok()) { |
| *output = Redis::Integer(1); |
| } else { |
| *output = Redis::Integer(0); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int timestamp_ = 0; |
| }; |
| |
| class CommandPExpireAt : public Commander { |
| public: |
| CommandPExpireAt() : Commander("pexpireat", 3, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| timestamp_ = static_cast<int>(std::stol(args[2])/1000); |
| if (timestamp_ >= INT32_MAX) { |
| return Status(Status::RedisParseErr, "the expire time was overflow"); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.Expire(args_[1], timestamp_); |
| if (s.ok()) { |
| *output = Redis::Integer(1); |
| } else { |
| *output = Redis::Integer(0); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int timestamp_ = 0; |
| }; |
| |
| class CommandPersist : public Commander { |
| public: |
| CommandPersist() : Commander("persist", 2, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ttl; |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = redis.TTL(args_[1], &ttl); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| if (ttl == -1 || ttl == -2) { |
| *output = Redis::Integer(0); |
| return Status::OK(); |
| } |
| s = redis.Expire(args_[1], 0); |
| if (!s.ok()) return Status(Status::RedisExecErr, s.ToString()); |
| *output = Redis::Integer(1); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHGet : public Commander { |
| public: |
| CommandHGet() : Commander("hget", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::string value; |
| rocksdb::Status s = hash_db.Get(args_[1], args_[2], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::NilString() : Redis::BulkString(value); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHSet : public Commander { |
| public: |
| CommandHSet() : Commander("hset", 4, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.Set(args_[1], args_[2], args_[3], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHSetNX : public Commander { |
| public: |
| CommandHSetNX() : Commander("hsetnx", 4, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.SetNX(args_[1], args_[2], args_[3], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHStrlen : public Commander { |
| public: |
| CommandHStrlen() : Commander("hstrlen", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::string value; |
| rocksdb::Status s = hash_db.Get(args_[1], args_[2], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(static_cast<int>(value.size())); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHDel : public Commander { |
| public: |
| CommandHDel() : Commander("hdel", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> fields; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| fields.emplace_back(Slice(args_[i])); |
| } |
| rocksdb::Status s = hash_db.Delete(args_[1], fields, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHExists : public Commander { |
| public: |
| CommandHExists() : Commander("hexists", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::string value; |
| rocksdb::Status s = hash_db.Get(args_[1], args_[2], &value); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::Integer(0) : Redis::Integer(1); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHLen : public Commander { |
| public: |
| CommandHLen() : Commander("hlen", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint32_t count; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.Size(args_[1], &count); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::Integer(0) : Redis::Integer(count); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHIncrBy : public Commander { |
| public: |
| CommandHIncrBy() : Commander("hincrby", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stoll(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int64_t ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.IncrBy(args_[1], args_[2], increment_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int64_t increment_ = 0; |
| }; |
| |
| class CommandHIncrByFloat : public Commander { |
| public: |
| CommandHIncrByFloat() : Commander("hincrbyfloat", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| increment_ = std::stof(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| float ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = hash_db.IncrByFloat(args_[1], args_[2], increment_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::BulkString(std::to_string(ret)); |
| return Status::OK(); |
| } |
| |
| private: |
| float increment_ = 0; |
| }; |
| |
| class CommandHMGet : public Commander { |
| public: |
| CommandHMGet() : Commander("hmget", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> fields; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| fields.emplace_back(Slice(args_[i])); |
| } |
| std::vector<std::string> values; |
| rocksdb::Status s = hash_db.MGet(args_[1], fields, &values); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| values.resize(fields.size(), ""); |
| } |
| *output = Redis::MultiBulkString(values); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHMSet : public Commander { |
| public: |
| CommandHMSet() : Commander("hmset", -4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 0) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<FieldValue> field_values; |
| for (unsigned int i = 2; i < args_.size(); i += 2) { |
| field_values.push_back(FieldValue{args_[i], args_[i + 1]}); |
| } |
| rocksdb::Status s = hash_db.MSet(args_[1], field_values, false, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHKeys : public Commander { |
| public: |
| CommandHKeys() : Commander("hkeys", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<FieldValue> field_values; |
| rocksdb::Status s = hash_db.GetAll(args_[1], &field_values, HashFetchType::kOnlyKey); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| std::vector<std::string> keys; |
| for (const auto fv : field_values) { |
| keys.emplace_back(fv.field); |
| } |
| *output = Redis::MultiBulkString(keys); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHVals : public Commander { |
| public: |
| CommandHVals() : Commander("hvals", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<FieldValue> field_values; |
| rocksdb::Status s = hash_db.GetAll(args_[1], &field_values, HashFetchType::kOnlyValue); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| std::vector<std::string> values; |
| for (const auto fv : field_values) { |
| values.emplace_back(fv.value); |
| } |
| *output = Redis::MultiBulkString(values); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHGetAll : public Commander { |
| public: |
| CommandHGetAll() : Commander("hgetall", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<FieldValue> field_values; |
| rocksdb::Status s = hash_db.GetAll(args_[1], &field_values); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = "*" + std::to_string(field_values.size() * 2) + CRLF; |
| for (const auto fv : field_values) { |
| *output += Redis::BulkString(fv.field); |
| *output += Redis::BulkString(fv.value); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPush : public Commander { |
| public: |
| CommandPush(bool create_if_missing, bool left) |
| : Commander("push", -3, true) { |
| left_ = left; |
| create_if_missing_ = create_if_missing; |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> elems; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| elems.emplace_back(args_[i]); |
| } |
| int ret; |
| rocksdb::Status s; |
| if (create_if_missing_) { |
| s = list_db.Push(args_[1], elems, left_, &ret); |
| } else { |
| s = list_db.PushX(args_[1], elems, left_, &ret); |
| } |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| svr->WakeupBlockingConns(args_[1], elems.size()); |
| |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| bool left_; |
| bool create_if_missing_; |
| }; |
| |
| class CommandLPush : public CommandPush { |
| public: |
| CommandLPush() : CommandPush(true, true) { name_ = "lpush"; } |
| }; |
| |
| class CommandRPush : public CommandPush { |
| public: |
| CommandRPush() : CommandPush(true, false) { name_ = "rpush"; } |
| }; |
| |
| class CommandLPushX : public CommandPush { |
| public: |
| CommandLPushX() : CommandPush(false, true) { name_ = "lpushx"; } |
| }; |
| |
| class CommandRPushX : public CommandPush { |
| public: |
| CommandRPushX() : CommandPush(false, false) { name_ = "rpushx"; } |
| }; |
| |
| class CommandPop : public Commander { |
| public: |
| explicit CommandPop(bool left) : Commander("pop", 2, true) { left_ = left; } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::string elem; |
| rocksdb::Status s = list_db.Pop(args_[1], &elem, left_); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::BulkString(elem); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| bool left_; |
| }; |
| |
| class CommandLPop : public CommandPop { |
| public: |
| CommandLPop() : CommandPop(true) { name_ = "lpop"; } |
| }; |
| |
| class CommandRPop : public CommandPop { |
| public: |
| CommandRPop() : CommandPop(false) { name_ = "rpop"; } |
| }; |
| |
| class CommandBPop : public Commander { |
| public: |
| explicit CommandBPop(bool left) : Commander("bpop", -3, true) { left_ = left; } |
| ~CommandBPop() { |
| if (timer_ != nullptr) { |
| event_free(timer_); |
| timer_ = nullptr; |
| } |
| } |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| timeout_ = std::stoi(args[args.size() - 1]); |
| if (timeout_ < 0) { |
| return Status(Status::RedisParseErr, "timeout should not be negative"); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, "timeout is not an integer or out of range"); |
| } |
| keys_ = std::vector<std::string>(args.begin() + 1, args.end() - 1); |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| svr_ = svr; |
| conn_ = conn; |
| |
| auto bev = conn->GetBufferEvent(); |
| auto s = TryPopFromList(); |
| if (s.ok() || !s.IsNotFound()) { |
| return Status::OK(); // error has already output in TryPopFromList |
| } |
| for (const auto &key : keys_) { |
| svr_->AddBlockingKey(key, conn_); |
| } |
| bufferevent_setcb(bev, nullptr, WriteCB, EventCB, this); |
| if (timeout_) { |
| timer_ = evtimer_new(bufferevent_get_base(bev), TimerCB, this); |
| timeval tm = {timeout_, 0}; |
| evtimer_add(timer_, &tm); |
| } |
| return Status::OK(); |
| } |
| |
| rocksdb::Status TryPopFromList() { |
| Redis::List list_db(svr_->storage_, conn_->GetNamespace()); |
| std::string elem; |
| rocksdb::Status s; |
| for (const auto &key : keys_) { |
| s = list_db.Pop(key, &elem, left_); |
| if (s.ok() || !s.IsNotFound()) { |
| break; |
| } |
| } |
| if (s.ok()) { |
| conn_->Reply(Redis::BulkString(elem)); |
| } else if (!s.IsNotFound()) { |
| conn_->Reply(Redis::Error("ERR " + s.ToString())); |
| LOG(ERROR) << "Failed to execute redis command: " << conn_->current_cmd_->Name() |
| << ", err: " << s.ToString(); |
| } |
| return s; |
| } |
| |
| static void WriteCB(bufferevent *bev, void *ctx) { |
| auto self = reinterpret_cast<CommandBPop *>(ctx); |
| auto s = self->TryPopFromList(); |
| // if pop fail ,currently we compromised to close bpop request |
| if (s.IsNotFound()) { |
| self->conn_->Reply(Redis::NilString()); |
| LOG(ERROR) << "[BPOP] Failed to execute redis command: " << self->conn_->current_cmd_->Name() |
| << ", err: another concurrent pop request must have stole the data before this bpop request" |
| << " or bpop is in a pipeline cmd list(cmd before bpop replyed trigger this writecb)"; |
| } |
| if (self->timer_ != nullptr) { |
| event_free(self->timer_); |
| self->timer_ = nullptr; |
| } |
| self->unBlockingAll(); |
| bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, |
| Redis::Connection::OnEvent, self->conn_); |
| bufferevent_enable(bev, EV_READ); |
| } |
| |
| static void EventCB(bufferevent *bev, int16_t events, void *ctx) { |
| auto self = static_cast<CommandBPop *>(ctx); |
| if (events & (BEV_EVENT_EOF | BEV_EVENT_ERROR)) { |
| if (self->timer_ != nullptr) { |
| event_free(self->timer_); |
| self->timer_ = nullptr; |
| } |
| self->unBlockingAll(); |
| } |
| Redis::Connection::OnEvent(bev, events, self->conn_); |
| } |
| |
| static void TimerCB(int, int16_t events, void *ctx) { |
| auto self = reinterpret_cast<CommandBPop *>(ctx); |
| self->conn_->Reply(Redis::NilString()); |
| event_free(self->timer_); |
| self->timer_ = nullptr; |
| self->unBlockingAll(); |
| auto bev = self->conn_->GetBufferEvent(); |
| bufferevent_setcb(bev, Redis::Connection::OnRead, Redis::Connection::OnWrite, |
| Redis::Connection::OnEvent, self->conn_); |
| bufferevent_enable(bev, EV_READ); |
| } |
| |
| private: |
| bool left_ = false; |
| int timeout_ = 0; // second |
| std::vector<std::string> keys_; |
| Server *svr_ = nullptr; |
| Connection *conn_ = nullptr; |
| event *timer_ = nullptr; |
| |
| void unBlockingAll() { |
| for (const auto &key : keys_) { |
| svr_->UnBlockingKey(key, conn_); |
| } |
| } |
| }; |
| |
| class CommandBLPop : public CommandBPop { |
| public: |
| CommandBLPop() : CommandBPop(true) { name_ = "blpop"; } |
| }; |
| |
| class CommandBRPop : public CommandBPop { |
| public: |
| CommandBRPop() : CommandBPop(false) { name_ = "brpop"; } |
| }; |
| |
| class CommandLRem : public Commander { |
| public: |
| CommandLRem() : Commander("lrem", 4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| count_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = list_db.Rem(args_[1], count_, args_[3], &ret); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int count_ = 0; |
| }; |
| |
| class CommandLInsert : public Commander { |
| public: |
| CommandLInsert() : Commander("linsert", 5, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if ((Util::ToLower(args[2]) == "before")) { |
| before_ = true; |
| } else if ((Util::ToLower(args[2]) == "after")) { |
| before_ = false; |
| } else { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = list_db.Insert(args_[1], args_[3], args_[4], before_, &ret); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| bool before_ = false; |
| }; |
| |
| class CommandLRange : public Commander { |
| public: |
| CommandLRange() : Commander("lrange", 4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> elems; |
| rocksdb::Status s = list_db.Range(args_[1], start_, stop_, &elems); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(elems); |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0, stop_ = 0; |
| }; |
| |
| class CommandLLen : public Commander { |
| public: |
| CommandLLen() : Commander("llen", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| uint32_t count; |
| rocksdb::Status s = list_db.Size(args_[1], &count); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(count); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandLIndex : public Commander { |
| public: |
| CommandLIndex() : Commander("lindex", 3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| index_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::string elem; |
| rocksdb::Status s = list_db.Index(args_[1], index_, &elem); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::BulkString(elem); |
| return Status::OK(); |
| } |
| |
| private: |
| int index_ = 0; |
| }; |
| |
| class CommandLSet : public Commander { |
| public: |
| CommandLSet() : Commander("lset", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| index_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = list_db.Set(args_[1], index_, args_[3]); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| |
| private: |
| int index_ = 0; |
| }; |
| |
| class CommandLTrim : public Commander { |
| public: |
| CommandLTrim() : Commander("ltrim", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = list_db.Trim(args_[1], start_, stop_); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0, stop_ = 0; |
| }; |
| |
| class CommandRPopLPUSH : public Commander { |
| public: |
| CommandRPopLPUSH() : Commander("rpoplpush", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::List list_db(svr->storage_, conn->GetNamespace()); |
| std::string elem; |
| rocksdb::Status s = list_db.RPopLPush(args_[1], args_[2], &elem); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = s.IsNotFound() ? Redis::NilString() : Redis::BulkString(elem); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSAdd : public Commander { |
| public: |
| CommandSAdd() : Commander("sadd", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> members; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| members.emplace_back(args_[i]); |
| } |
| int ret; |
| rocksdb::Status s = set_db.Add(args_[1], members, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSRem : public Commander { |
| public: |
| CommandSRem() : Commander("srem", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<Slice> members; |
| for (unsigned int i = 2; i < args_.size(); i++) { |
| members.emplace_back(args_[i]); |
| } |
| int ret; |
| rocksdb::Status s = set_db.Remove(args_[1], members, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSCard : public Commander { |
| public: |
| CommandSCard() : Commander("scard", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = set_db.Card(args_[1], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSMembers : public Commander { |
| public: |
| CommandSMembers() : Commander("smembers", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| rocksdb::Status s = set_db.Members(args_[1], &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSIsMember : public Commander { |
| public: |
| CommandSIsMember() : Commander("sismmeber", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = set_db.IsMember(args_[1], args_[2], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSPop : public Commander { |
| public: |
| CommandSPop() : Commander("spop", -2, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| if (args.size() == 3) { |
| count_ = std::stoi(args[2]); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| rocksdb::Status s = set_db.Take(args_[1], &members, count_, true); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| |
| private: |
| int count_ = 1; |
| }; |
| |
| class CommandSRandMember : public Commander { |
| public: |
| CommandSRandMember() : Commander("srandmember", -2, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| if (args.size() == 3) { |
| count_ = std::stoi(args[2]); |
| } |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| rocksdb::Status s = set_db.Take(args_[1], &members, count_, false); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| |
| private: |
| int count_ = 1; |
| }; |
| |
| class CommandSMove : public Commander { |
| public: |
| CommandSMove() : Commander("smove", 4, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = set_db.Move(args_[1], args_[2], args_[3], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSDiff : public Commander { |
| public: |
| CommandSDiff() : Commander("sdiff", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<Slice> keys; |
| for (size_t i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| std::vector<std::string> members; |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.Diff(keys, &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSUnion : public Commander { |
| public: |
| CommandSUnion() : Commander("sunion", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<Slice> keys; |
| for (size_t i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| std::vector<std::string> members; |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.Union(keys, &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSInter : public Commander { |
| public: |
| CommandSInter() : Commander("sinter", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<Slice> keys; |
| for (size_t i = 1; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| std::vector<std::string> members; |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.Inter(keys, &members); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSDiffStore: public Commander { |
| public: |
| CommandSDiffStore() : Commander("sdiffstore", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret = 0; |
| std::vector<Slice> keys; |
| for (size_t i = 2; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.DiffStore(args_[1], keys, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSUnionStore: public Commander { |
| public: |
| CommandSUnionStore() : Commander("sunionstore", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret = 0; |
| std::vector<Slice> keys; |
| for (size_t i = 2; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.UnionStore(args_[1], keys, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSInterStore: public Commander { |
| public: |
| CommandSInterStore() : Commander("sinterstore", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret = 0; |
| std::vector<Slice> keys; |
| for (size_t i = 2; i < args_.size(); i++) { |
| keys.emplace_back(args_[i]); |
| } |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| auto s = set_db.InterStore(args_[1], keys, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZAdd : public Commander { |
| public: |
| CommandZAdd() : Commander("zadd", -4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 0) { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| |
| try { |
| for (unsigned i = 2; i < args.size(); i += 2) { |
| double score = std::stod(args[i]); |
| member_scores_.emplace_back(MemberScore{args[i + 1], score}); |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "ERR value is not a valid float"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Add(args_[1], 0, &member_scores_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<MemberScore> member_scores_; |
| }; |
| |
| class CommandZCount : public Commander { |
| public: |
| CommandZCount() : Commander("zcount", 4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Count(args_[1], spec_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeSpec spec_; |
| }; |
| |
| class CommandZCard : public Commander { |
| public: |
| CommandZCard() : Commander("zcard", 2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Card(args_[1], &ret); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZIncrBy : public Commander { |
| public: |
| CommandZIncrBy() : Commander("zincrby", 4, true) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| incr_ = std::stod(args[2]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "value is not an double or out of range"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| double score; |
| |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.IncrBy(args_[1], args_[3], incr_, &score); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::BulkString(std::to_string(score)); |
| return Status::OK(); |
| } |
| |
| private: |
| double incr_ = 0.0; |
| }; |
| |
| class CommandZLexCount : public Commander { |
| public: |
| CommandZLexCount() : Commander("zlexcount", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.RangeByLex(args_[1], spec_, nullptr, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeLexSpec spec_; |
| }; |
| |
| class CommandZPop : public Commander { |
| public: |
| explicit CommandZPop(bool min) : Commander("zpop", -2, true), min_(min) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() > 2) { |
| try { |
| count_ = std::stoi(args[2]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<MemberScore> memeber_scores; |
| rocksdb::Status s = zset_db.Pop(args_[1], count_, min_, &memeber_scores); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| output->append(Redis::MultiLen(memeber_scores.size() * 2)); |
| for (const auto ms : memeber_scores) { |
| output->append(Redis::BulkString(ms.member)); |
| output->append(Redis::BulkString(std::to_string(ms.score))); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| bool min_; |
| int count_ = 1; |
| }; |
| |
| class CommandZPopMin : public CommandZPop { |
| public: |
| CommandZPopMin() : CommandZPop(true) { name_ = "zpopmin"; } |
| }; |
| |
| class CommandZPopMax : public CommandZPop { |
| public: |
| CommandZPopMax() : CommandZPop(false) { name_ = "zpopmax"; } |
| }; |
| |
| class CommandZRange : public Commander { |
| public: |
| explicit CommandZRange(bool reversed = false) |
| : Commander("zrange", -4, false), reversed_(reversed) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| if (args.size() > 4 && (Util::ToLower(args[4]) == "withscores")) { |
| with_scores_ = true; |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<MemberScore> memeber_scores; |
| uint8_t flags = !reversed_ ? 0 : ZSET_REVERSED; |
| rocksdb::Status s = |
| zset_db.Range(args_[1], start_, stop_, flags, &memeber_scores); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (!with_scores_) { |
| output->append(Redis::MultiLen(memeber_scores.size())); |
| } else { |
| output->append(Redis::MultiLen(memeber_scores.size() * 2)); |
| } |
| for (const auto ms : memeber_scores) { |
| output->append(Redis::BulkString(ms.member)); |
| if (with_scores_) |
| output->append(Redis::BulkString(std::to_string(ms.score))); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0; |
| int stop_ = 0; |
| bool reversed_; |
| bool with_scores_ = false; |
| }; |
| |
| class CommandZRevRange : public CommandZRange { |
| public: |
| CommandZRevRange() : CommandZRange(true) { name_ = "zrevrange"; } |
| }; |
| |
| class CommandZRangeByLex : public Commander { |
| public: |
| CommandZRangeByLex() : Commander("zrangebylex", -4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| try { |
| if (args.size() == 7 && Util::ToLower(args[4]) == "limit") { |
| spec_.offset = std::stoi(args[5]); |
| spec_.count = std::stoi(args[6]); |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| rocksdb::Status s = zset_db.RangeByLex(args_[1], spec_, &members, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(members); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeLexSpec spec_; |
| }; |
| |
| class CommandZRangeByScore : public Commander { |
| public: |
| explicit CommandZRangeByScore(bool reversed = false) : Commander("zrangebyscore", -4, false) { |
| spec_.reversed = reversed; |
| } |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s; |
| if (spec_.reversed) { |
| s = Redis::ZSet::ParseRangeSpec(args[3], args[2], &spec_); |
| } else { |
| s = Redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_); |
| } |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| try { |
| size_t i = 4; |
| while (i < args.size()) { |
| if (Util::ToLower(args[i]) == "withscores") { |
| with_scores_ = true; |
| i++; |
| } else if (Util::ToLower(args[i]) == "limit" && i + 2 < args.size()) { |
| spec_.offset = std::stoi(args[i + 1]); |
| spec_.count = std::stoi(args[i + 2]); |
| i += 3; |
| } else { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<MemberScore> memeber_scores; |
| rocksdb::Status s = |
| zset_db.RangeByScore(args_[1], spec_, &memeber_scores, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (!with_scores_) { |
| output->append(Redis::MultiLen(memeber_scores.size())); |
| } else { |
| output->append(Redis::MultiLen(memeber_scores.size() * 2)); |
| } |
| for (const auto ms : memeber_scores) { |
| output->append(Redis::BulkString(ms.member)); |
| if (with_scores_) |
| output->append(Redis::BulkString(std::to_string(ms.score))); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeSpec spec_; |
| bool with_scores_ = false; |
| }; |
| |
| class CommandZRank : public Commander { |
| public: |
| explicit CommandZRank(bool reversed = false) |
| : Commander("zrank", 3, false), reversed_(reversed) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int rank; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Rank(args_[1], args_[2], reversed_, &rank); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (rank == -1) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::Integer(rank); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| bool reversed_; |
| }; |
| |
| class CommandZRevRank : public CommandZRank { |
| public: |
| CommandZRevRank() : CommandZRank(true) { name_ = "zrevrank"; } |
| }; |
| |
| class CommandZRevRangeByScore : public CommandZRangeByScore { |
| public: |
| CommandZRevRangeByScore() : CommandZRangeByScore(true) { name_ = "zrevrangebyscore"; } |
| }; |
| |
| class CommandZRem : public Commander { |
| public: |
| CommandZRem() : Commander("zrem", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<rocksdb::Slice> members; |
| for (unsigned i = 2; i < args_.size(); i++) { |
| members.emplace_back(args_[i]); |
| } |
| rocksdb::Status s = zset_db.Remove(args_[1], members, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZRemRangeByRank : public Commander { |
| public: |
| CommandZRemRangeByRank() : Commander("zremrangebyrank", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| start_ = std::stoi(args[2]); |
| stop_ = std::stoi(args[3]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = |
| zset_db.RemoveRangeByRank(args_[1], start_, stop_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0; |
| int stop_ = 0; |
| }; |
| |
| class CommandZRemRangeByScore : public Commander { |
| public: |
| CommandZRemRangeByScore() : Commander("zremrangebyscore", -4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.RemoveRangeByScore(args_[1], spec_, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeSpec spec_; |
| }; |
| |
| class CommandZRemRangeByLex : public Commander { |
| public: |
| CommandZRemRangeByLex() : Commander("zremrangebylex", 4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| Status s = Redis::ZSet::ParseRangeLexSpec(args[2], args[3], &spec_); |
| if (!s.IsOK()) { |
| return Status(Status::RedisParseErr, s.Msg()); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.RemoveRangeByLex(args_[1], spec_, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| |
| private: |
| ZRangeLexSpec spec_; |
| }; |
| |
| class CommandZScore : public Commander { |
| public: |
| CommandZScore() : Commander("zscore", 3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| double score; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.Score(args_[1], args_[2], &score); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::BulkString(std::to_string(score)); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZUnionStore : public Commander { |
| public: |
| CommandZUnionStore() : Commander("zunionstore", -4, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| numkeys_ = std::stoi(args[2]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| if (numkeys_ > args.size() - 3) { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| size_t j = 0; |
| while (j < numkeys_) { |
| keys_weights_.emplace_back(KeyWeight{args[j + 3], 1}); |
| j++; |
| } |
| size_t i = 3 + numkeys_; |
| while (i < args.size()) { |
| if (Util::ToLower(args[i]) == "aggregate" && i + 1 < args.size()) { |
| if (Util::ToLower(args[i + 1]) == "sum") { |
| aggregate_method_ = kAggregateSum; |
| } else if (Util::ToLower(args[i + 1]) == "min") { |
| aggregate_method_ = kAggregateMin; |
| } else if (Util::ToLower(args[i + 1]) == "max") { |
| aggregate_method_ = kAggregateMax; |
| } else { |
| return Status(Status::RedisParseErr, "aggregate para error"); |
| } |
| i += 2; |
| } else if (Util::ToLower(args[i]) == "weights" && i + numkeys_ < args.size()) { |
| size_t j = 0; |
| while (j < numkeys_) { |
| try { |
| keys_weights_[j].weight = std::stod(args[i + j + 1]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "value is not an double or out of range"); |
| } |
| j++; |
| } |
| i += numkeys_ + 1; |
| } else { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.UnionStore(args_[1], keys_weights_, aggregate_method_, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| |
| protected: |
| size_t numkeys_ = 0; |
| std::vector<KeyWeight> keys_weights_; |
| AggregateMethod aggregate_method_ = kAggregateSum; |
| }; |
| |
| class CommandZInterStore : public CommandZUnionStore { |
| public: |
| CommandZInterStore() : CommandZUnionStore() { name_ = "zinterstore"; } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int size; |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = zset_db.InterStore(args_[1], keys_weights_, aggregate_method_, &size); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(size); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGeoBase : public Commander { |
| public: |
| explicit CommandGeoBase(const std::string &name, int arity, bool is_write = false) |
| : Commander(name, arity, is_write) {} |
| |
| Status ParseDistanceUnit(const std::string ¶m) { |
| if (Util::ToLower(param) == "m") { |
| distance_unit_ = kDistanceMeter; |
| } else if (Util::ToLower(param) == "km") { |
| distance_unit_ = kDistanceKilometers; |
| } else if (Util::ToLower(param) == "ft") { |
| distance_unit_ = kDistanceFeet; |
| } else if (Util::ToLower(param) == "mi") { |
| distance_unit_ = kDistanceMiles; |
| } else { |
| return Status(Status::RedisParseErr, "distance unit para error"); |
| } |
| return Status::OK(); |
| } |
| |
| Status ParseLongLat(const std::string &longitude_para, |
| const std::string &latitude_para, |
| double *longitude, |
| double *latitude) { |
| try { |
| *longitude = std::stod(longitude_para); |
| *latitude = std::stod(latitude_para); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "ERR value is not a valid float"); |
| } |
| if (*longitude < GEO_LONG_MIN || *longitude > GEO_LONG_MAX || |
| *latitude < GEO_LAT_MIN || *latitude > GEO_LAT_MAX) { |
| return Status(Status::RedisParseErr, "invalid longitude,latitude pair " + longitude_para + "," + latitude_para); |
| } |
| return Status::OK(); |
| } |
| |
| double GetDistanceByUnit(double distance) { |
| return distance / GetUnitConversion(); |
| } |
| |
| double GetRadiusMeters(double radius) { |
| return radius * GetUnitConversion(); |
| } |
| |
| double GetUnitConversion() { |
| double conversion = 0; |
| switch (distance_unit_) { |
| case kDistanceMeter:conversion = 1; |
| break; |
| case kDistanceKilometers:conversion = 1000; |
| break; |
| case kDistanceFeet:conversion = 0.3048; |
| break; |
| case kDistanceMiles:conversion = 1609.34; |
| break; |
| } |
| return conversion; |
| } |
| |
| protected: |
| DistanceUnit distance_unit_ = kDistanceMeter; |
| }; |
| |
| class CommandGeoAdd : public CommandGeoBase { |
| public: |
| CommandGeoAdd() : CommandGeoBase("geoadd", -5, true) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if ((args.size() - 5) % 3 != 0) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| for (unsigned i = 2; i < args.size(); i += 3) { |
| double longitude = 0, latitude = 0; |
| auto s = ParseLongLat(args[i], args[i + 1], &longitude, &latitude); |
| if (!s.IsOK()) return s; |
| geo_points_.emplace_back(GeoPoint{longitude, latitude, args[i + 2]}); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| int ret; |
| Redis::Geo geo_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = geo_db.Add(args_[1], &geo_points_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<GeoPoint> geo_points_; |
| }; |
| |
| class CommandGeoDist : public CommandGeoBase { |
| public: |
| CommandGeoDist() : CommandGeoBase("geodist", -4, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() == 5) { |
| auto s = ParseDistanceUnit(args[4]); |
| if (!s.IsOK()) return s; |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| double distance; |
| Redis::Geo geo_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = geo_db.Dist(args_[1], args_[2], args_[3], &distance); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| if (s.IsNotFound()) { |
| *output = Redis::NilString(); |
| } else { |
| *output = Redis::BulkString(std::to_string(GetDistanceByUnit(distance))); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGeoHash : public Commander { |
| public: |
| CommandGeoHash() : Commander("geohash", -3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| for (unsigned i = 2; i < args.size(); i++) { |
| members_.emplace_back(args[i]); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<std::string> hashes; |
| Redis::Geo geo_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = geo_db.Hash(args_[1], members_, &hashes); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::MultiBulkString(hashes); |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<Slice> members_; |
| }; |
| |
| class CommandGeoPos : public Commander { |
| public: |
| CommandGeoPos() : Commander("geopos", -3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| for (unsigned i = 2; i < args.size(); i++) { |
| members_.emplace_back(args[i]); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::map<std::string, GeoPoint> geo_points; |
| Redis::Geo geo_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = geo_db.Pos(args_[1], members_, &geo_points); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| std::vector<std::string> list; |
| for (const auto &member : members_) { |
| auto iter = geo_points.find(member.ToString()); |
| if (iter == geo_points.end()) { |
| list.emplace_back(Redis::NilString()); |
| } else { |
| list.emplace_back(Redis::MultiBulkString({std::to_string(iter->second.longitude), |
| std::to_string(iter->second.latitude)})); |
| } |
| } |
| *output = Redis::Array(list); |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<Slice> members_; |
| }; |
| |
| class CommandGeoRadius : public CommandGeoBase { |
| public: |
| CommandGeoRadius() : CommandGeoBase("georadius", -6, true) {} |
| explicit CommandGeoRadius(const std::string &name, int arity, bool is_write = false) |
| : CommandGeoBase(name, arity, is_write) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| auto s = ParseLongLat(args[2], args[3], &longitude_, &latitude_); |
| if (!s.IsOK()) return s; |
| try { |
| radius_ = std::stod(args[4]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "ERR value is not a valid float"); |
| } |
| s = ParseDistanceUnit(args[5]); |
| if (!s.IsOK()) return s; |
| s = ParseRadiusExtraOption(); |
| if (!s.IsOK()) return s; |
| return Commander::Parse(args); |
| } |
| |
| Status ParseRadiusExtraOption(size_t i = 6) { |
| while (i < args_.size()) { |
| if (Util::ToLower(args_[i]) == "withcoord") { |
| with_coord_ = true; |
| i++; |
| } else if (Util::ToLower(args_[i]) == "withdist") { |
| with_dist_ = true; |
| i++; |
| } else if (Util::ToLower(args_[i]) == "withhash") { |
| with_hash_ = true; |
| i++; |
| } else if (Util::ToLower(args_[i]) == "asc") { |
| sort_ = kSortASC; |
| i++; |
| } else if (Util::ToLower(args_[i]) == "desc") { |
| sort_ = kSortDESC; |
| i++; |
| } else if (Util::ToLower(args_[i]) == "count" && i + 1 < args_.size()) { |
| try { |
| count_ = std::stoi(args_[i + 1]); |
| i += 2; |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "ERR count is not a valid int"); |
| } |
| } else if (is_write_ && (Util::ToLower(args_[i]) == "store" || Util::ToLower(args_[i]) == "storedist") |
| && i + 1 < args_.size()) { |
| store_key_ = args_[i + 1]; |
| if (Util::ToLower(args_[i]) == "storedist") { |
| store_distance_ = true; |
| } |
| i += 2; |
| } else { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| } |
| /* Trap options not compatible with STORE and STOREDIST. */ |
| if (!store_key_.empty() && (with_dist_ || with_hash_ || with_coord_)) { |
| return Status(Status::RedisParseErr, |
| "STORE option in GEORADIUS is not compatible with WITHDIST, WITHHASH and WITHCOORDS options"); |
| } |
| /* COUNT without ordering does not make much sense, force ASC |
| * ordering if COUNT was specified but no sorting was requested. |
| * */ |
| if (count_ != 0 && sort_ == kSortNone) { |
| sort_ = kSortASC; |
| } |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<GeoPoint> geo_points; |
| Redis::Geo geo_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = geo_db.Radius(args_[1], longitude_, latitude_, GetRadiusMeters(radius_), |
| count_, |
| sort_, |
| store_key_, |
| store_distance_, GetUnitConversion(), &geo_points); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = GenerateOutput(geo_points); |
| return Status::OK(); |
| } |
| |
| std::string GenerateOutput(const std::vector<GeoPoint> &geo_points) { |
| int result_length = geo_points.size(); |
| int returned_items_count = (count_ == 0 || result_length < count_) ? result_length : count_; |
| std::vector<std::string> list; |
| for (int i = 0; i < returned_items_count; i++) { |
| auto geo_point = geo_points[i]; |
| if (!with_coord_ && !with_hash_ && !with_dist_) { |
| list.emplace_back(Redis::BulkString(geo_point.member)); |
| } else { |
| std::vector<std::string> one; |
| one.emplace_back(Redis::BulkString(geo_point.member)); |
| if (with_dist_) { |
| one.emplace_back(Redis::BulkString(std::to_string(GetDistanceByUnit(geo_point.dist)))); |
| } |
| if (with_hash_) { |
| one.emplace_back(Redis::BulkString(std::to_string(geo_point.score))); |
| } |
| if (with_coord_) { |
| one.emplace_back(Redis::MultiBulkString({std::to_string(geo_point.longitude), |
| std::to_string(geo_point.latitude)})); |
| } |
| list.emplace_back(Redis::Array(one)); |
| } |
| } |
| return Redis::Array(list); |
| } |
| |
| protected: |
| double radius_ = 0; |
| bool with_coord_ = false; |
| bool with_dist_ = false; |
| bool with_hash_ = false; |
| int count_ = 0; |
| DistanceSort sort_ = kSortNone; |
| std::string store_key_; |
| bool store_distance_ = false; |
| |
| private: |
| double longitude_ = 0; |
| double latitude_ = 0; |
| }; |
| |
| class CommandGeoRadiusByMember : public CommandGeoRadius { |
| public: |
| CommandGeoRadiusByMember() : CommandGeoRadius("georadiusbymember", -5, true) {} |
| explicit CommandGeoRadiusByMember(const std::string &name, int arity, bool is_write = false) |
| : CommandGeoRadius(name, arity, is_write) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| radius_ = std::stod(args[3]); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "ERR value is not a valid float"); |
| } |
| auto s = ParseDistanceUnit(args[4]); |
| if (!s.IsOK()) return s; |
| s = ParseRadiusExtraOption(5); |
| if (!s.IsOK()) return s; |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::vector<GeoPoint> geo_points; |
| Redis::Geo geo_db(svr->storage_, conn->GetNamespace()); |
| rocksdb::Status s = geo_db.RadiusByMember(args_[1], args_[2], GetRadiusMeters(radius_), |
| count_, |
| sort_, |
| store_key_, |
| store_distance_, GetUnitConversion(), &geo_points); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = GenerateOutput(geo_points); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandGeoRadiusReadonly : public CommandGeoRadius { |
| public: |
| CommandGeoRadiusReadonly() : CommandGeoRadius("georadius_ro", -6, false) {} |
| }; |
| |
| class CommandGeoRadiusByMemberReadonly : public CommandGeoRadiusByMember { |
| public: |
| CommandGeoRadiusByMemberReadonly() : CommandGeoRadiusByMember("georadius_ro", -5, false) {} |
| }; |
| |
| class CommandSortedintAdd : public Commander { |
| public: |
| CommandSortedintAdd() : Commander("siadd", -3, true) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| for (unsigned i = 2; i < args.size(); i++) { |
| auto id = std::stoull(args[i]); |
| ids_.emplace_back(id); |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Sortedint sortedint_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = sortedint_db.Add(args_[1], ids_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<uint64_t> ids_; |
| }; |
| |
| class CommandSortedintRem : public Commander { |
| public: |
| CommandSortedintRem() : Commander("sirem", -3, true) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| for (unsigned i = 2; i < args.size(); i++) { |
| auto id = std::stoull(args[i]); |
| ids_.emplace_back(id); |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Sortedint sortedint_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = sortedint_db.Remove(args_[1], ids_, &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<uint64_t> ids_; |
| }; |
| |
| class CommandSortedintCard : public Commander { |
| public: |
| CommandSortedintCard() : Commander("sicard", 2, false) {} |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Sortedint sortedint_db(svr->storage_, conn->GetNamespace()); |
| int ret; |
| rocksdb::Status s = sortedint_db.Card(args_[1], &ret); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = Redis::Integer(ret); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSortedintRange : public Commander { |
| public: |
| explicit CommandSortedintRange(bool reversed = false) : Commander("sirange", -4, false) { |
| reversed_ = reversed; |
| } |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| offset_ = std::stoi(args[2]); |
| limit_ = std::stoi(args[3]); |
| if (args.size() == 6) { |
| if (args[4] != "cursor") { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| cursor_id_ = std::stoull(args[5]); |
| } |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Sortedint sortedint_db(svr->storage_, conn->GetNamespace()); |
| std::vector<uint64_t> ids; |
| rocksdb::Status s = sortedint_db.Range(args_[1], cursor_id_, offset_, limit_, reversed_, &ids); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| output->append(Redis::MultiLen(ids.size())); |
| for (const auto id : ids) { |
| output->append(Redis::BulkString(std::to_string(id))); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| uint64_t cursor_id_ = 0; |
| uint64_t offset_ = 0; |
| uint64_t limit_ = 20; |
| bool reversed_ = false; |
| }; |
| |
| class CommandSortedintRevRange : public CommandSortedintRange { |
| public: |
| CommandSortedintRevRange() : CommandSortedintRange(true) { name_ = "sirevrange"; } |
| }; |
| |
| class CommandInfo : public Commander { |
| public: |
| CommandInfo() : Commander("info", -1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string section = "all"; |
| if (args_.size() == 2) { |
| section = Util::ToLower(args_[1]); |
| } |
| std::string info; |
| svr->GetInfo(conn->GetNamespace(), section, &info); |
| *output = Redis::BulkString(info); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandCompact : public Commander { |
| public: |
| CommandCompact() : Commander("compact", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| auto ns = conn->GetNamespace(); |
| std::string begin_key, end_key; |
| if (ns != kDefaultNamespace) { |
| Redis::Database redis_db(svr->storage_, conn->GetNamespace()); |
| std::string prefix; |
| ComposeNamespaceKey(ns, "", &prefix); |
| auto s = redis_db.FindKeyRangeWithPrefix(prefix, &begin_key, &end_key); |
| if (!s.ok()) { |
| if (s.IsNotFound()) { |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| } |
| Status s = svr->AsyncCompactDB(begin_key, end_key); |
| if (!s.IsOK()) return s; |
| *output = Redis::SimpleString("OK"); |
| LOG(INFO) << "Commpact was triggered by manual with executed success"; |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandBGSave: public Commander { |
| public: |
| CommandBGSave() : Commander("bgsave", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error(errAdministorPermissionRequired); |
| return Status::OK(); |
| } |
| Status s = svr->AsyncBgsaveDB(); |
| if (!s.IsOK()) return s; |
| *output = Redis::SimpleString("OK"); |
| LOG(INFO) << "BGSave was triggered by manual with executed success"; |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandDBSize : public Commander { |
| public: |
| CommandDBSize() : Commander("dbsize", -1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string ns = conn->GetNamespace(); |
| if (args_.size() == 1) { |
| KeyNumStats stats; |
| svr->GetLastestKeyNumStats(ns, &stats); |
| *output = Redis::Integer(stats.n_key); |
| } else if (args_.size() == 2 && args_[1] == "scan") { |
| Status s = svr->AsyncScanDBSize(ns); |
| if (s.IsOK()) { |
| *output = Redis::SimpleString("OK"); |
| } else { |
| *output = Redis::Error(s.Msg()); |
| } |
| } else { |
| *output = Redis::Error("DBSIZE subcommand only supports scan"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPublish : public Commander { |
| public: |
| CommandPublish() : Commander("publish", 3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::PubSub pubsub_db(svr->storage_); |
| auto s = pubsub_db.Publish(args_[1], args_[2]); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| int receivers = svr->PublishMessage(args_[1], args_[2]); |
| *output = Redis::Integer(receivers); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSubscribe : public Commander { |
| public: |
| CommandSubscribe() : Commander("subcribe", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| for (unsigned i = 1; i < args_.size(); i++) { |
| conn->SubscribeChannel(args_[i]); |
| output->append(Redis::MultiLen(3)); |
| output->append(Redis::BulkString("subscribe")); |
| output->append(Redis::BulkString(args_[i])); |
| output->append(Redis::Integer(conn->SubscriptionsCount())); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandUnSubscribe : public Commander { |
| public: |
| CommandUnSubscribe() : Commander("unsubcribe", -1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (args_.size() > 1) { |
| conn->UnSubscribeChannel(args_[1]); |
| } else { |
| conn->UnSubscribeAll(); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPSubscribe : public Commander { |
| public: |
| CommandPSubscribe() : Commander("psubcribe", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| for (unsigned i = 1; i < args_.size(); i++) { |
| conn->PSubscribeChannel(args_[i]); |
| output->append(Redis::MultiLen(3)); |
| output->append(Redis::BulkString("psubscribe")); |
| output->append(Redis::BulkString(args_[i])); |
| output->append(Redis::Integer(conn->PSubscriptionsCount())); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPUnSubscribe : public Commander { |
| public: |
| CommandPUnSubscribe() : Commander("punsubcribe", -1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (args_.size() > 1) { |
| conn->PUnSubscribeChannel(args_[1]); |
| } else { |
| conn->PUnSubscribeAll(); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPubSub : public Commander { |
| public: |
| CommandPubSub() : Commander("pubsub", -2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| subcommand_ = Util::ToLower(args[1]); |
| if (subcommand_ == "numpat" && args.size() == 2) { |
| return Status::OK(); |
| } |
| if ((subcommand_ == "numsub") && args.size() >= 2) { |
| if (args.size() > 2) { |
| channels_ = std::vector<std::string>(args.begin() + 2, args.end()); |
| } |
| return Status::OK(); |
| } |
| if ((subcommand_ == "channels") && args.size() <= 3) { |
| if (args.size() == 3) { |
| pattern_ = args[2]; |
| } |
| return Status::OK(); |
| } |
| return Status(Status::RedisInvalidCmd, |
| "ERR Unknown subcommand or wrong number of arguments"); |
| } |
| |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| if (subcommand_ == "numpat") { |
| *output = Redis::Integer(srv->GetPubSubPatternSize()); |
| return Status::OK(); |
| } else if (subcommand_ == "numsub") { |
| std::vector<ChannelSubscribeNum> channel_subscribe_nums; |
| srv->ListChannelSubscribeNum(channels_, &channel_subscribe_nums); |
| output->append(Redis::MultiLen(channel_subscribe_nums.size() * 2)); |
| for (const auto chan_subscribe_num : channel_subscribe_nums) { |
| output->append(Redis::BulkString(chan_subscribe_num.channel)); |
| output->append(Redis::Integer(chan_subscribe_num.subscribe_num)); |
| } |
| return Status::OK(); |
| } else if (subcommand_ == "channels") { |
| std::vector<std::string> channels; |
| srv->GetChannelsByPattern(pattern_, &channels); |
| *output = Redis::MultiBulkString(channels); |
| return Status::OK(); |
| } |
| |
| return Status(Status::RedisInvalidCmd, |
| "ERR Unknown subcommand or wrong number of arguments"); |
| } |
| |
| private: |
| std::string pattern_; |
| std::vector<std::string> channels_; |
| std::string subcommand_; |
| }; |
| |
| class CommandSlaveOf : public Commander { |
| public: |
| CommandSlaveOf() : Commander("slaveof", 3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| host_ = args[1]; |
| auto port = args[2]; |
| if (Util::ToLower(host_) == "no" && Util::ToLower(port) == "one") { |
| host_.clear(); |
| return Status::OK(); |
| } |
| try { |
| auto p = std::stoul(port); |
| if (p > UINT32_MAX) { |
| throw std::overflow_error("port out of range"); |
| } |
| port_ = static_cast<uint32_t>(p); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "port should be number"); |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error(errAdministorPermissionRequired); |
| return Status::OK(); |
| } |
| Status s; |
| if (host_.empty()) { |
| s = svr->RemoveMaster(); |
| if (s.IsOK()) { |
| *output = Redis::SimpleString("OK"); |
| LOG(WARNING) << "MASTER MODE enabled (user request from '" << conn->GetAddr() << "')"; |
| } |
| } else { |
| s = svr->AddMaster(host_, port_); |
| if (s.IsOK()) { |
| *output = Redis::SimpleString("OK"); |
| LOG(WARNING) << "SLAVE OF " << host_ << ":" << port_ |
| << " enabled (user request from '" << conn->GetAddr() << "')"; |
| } else { |
| LOG(ERROR) << "SLAVE OF " << host_ << ":" << port_ |
| << " (user request from '" << conn->GetAddr() << "') encounter error: " << s.Msg(); |
| } |
| } |
| return s; |
| } |
| |
| private: |
| std::string host_; |
| uint32_t port_ = 0; |
| }; |
| |
| class CommandStats: public Commander { |
| public: |
| CommandStats() : Commander("stats", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string stats_json = svr->GetRocksDBStatsJson(); |
| *output = Redis::BulkString(stats_json); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandPSync : public Commander { |
| public: |
| CommandPSync() : Commander("psync", 2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| try { |
| auto s = std::stoull(args[1]); |
| next_repl_seq = static_cast<rocksdb::SequenceNumber>(s); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "value is not an unsigned long long or out of range"); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| LOG(INFO) << "Slave " << conn->GetAddr() << " asks for synchronization" |
| << " with next sequence: " << next_repl_seq |
| << ", and local sequence: " << svr->storage_->LatestSeq(); |
| if (!checkWALBoundary(svr->storage_, next_repl_seq).IsOK()) { |
| svr->stats_.IncrPSyncErrCounter(); |
| *output = "sequence out of range, please use fullsync"; |
| return Status(Status::RedisExecErr, *output); |
| } |
| svr->stats_.IncrPSyncOKCounter(); |
| Status s = svr->AddSlave(conn, next_repl_seq); |
| if (!s.IsOK()) return s; |
| LOG(INFO) << "New slave: " << conn->GetAddr() << " was added, start increment syncing"; |
| conn->EnableFlag(Redis::Connection::kSlave); |
| // server would spawn a new thread to sync the batch, |
| // and connection would be took over, so should never trigger any event in worker thread |
| conn->Detach(); |
| write(conn->GetFD(), "+OK\r\n", 5); |
| return Status::OK(); |
| } |
| |
| private: |
| rocksdb::SequenceNumber next_repl_seq = 0; |
| |
| // Return OK if the seq is in the range of the current WAL |
| Status checkWALBoundary(Engine::Storage *storage, |
| rocksdb::SequenceNumber seq) { |
| if (seq == storage->LatestSeq() + 1) { |
| return Status::OK(); |
| } |
| // Upper bound |
| if (seq > storage->LatestSeq() + 1) { |
| return Status(Status::NotOK); |
| } |
| // Lower bound |
| std::unique_ptr<rocksdb::TransactionLogIterator> iter; |
| auto s = storage->GetWALIter(seq, &iter); |
| if (s.IsOK() && iter->Valid()) { |
| auto batch = iter->GetBatch(); |
| if (seq < batch.sequence) { |
| return Status(Status::NotOK); |
| } |
| return Status::OK(); |
| } |
| return Status(Status::NotOK); |
| } |
| }; |
| |
| class CommandPerfLog : public Commander { |
| public: |
| CommandPerfLog() : Commander("perflog", -2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| subcommand_ = Util::ToLower(args[1]); |
| if (subcommand_ != "reset" && subcommand_ != "get" && subcommand_ != "len") { |
| return Status(Status::NotOK, "PERFLOG subcommand must be one of RESET, LEN, GET"); |
| } |
| if (subcommand_ == "get" && args.size() >= 3) { |
| if (args[2] == "*") { |
| cnt_ = 0; |
| } else { |
| Status s = Util::StringToNum(args[2], &cnt_); |
| return s; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| auto perf_log = srv->GetPerfLog(); |
| if (subcommand_ == "len") { |
| *output = Redis::Integer(static_cast<int64_t>(perf_log->Size())); |
| } else if (subcommand_ == "reset") { |
| perf_log->Reset(); |
| *output = Redis::SimpleString("OK"); |
| } else if (subcommand_ == "get") { |
| *output = perf_log->GetLatestEntries(cnt_); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| std::string subcommand_; |
| int64_t cnt_ = 10; |
| }; |
| |
| class CommandSlowlog : public Commander { |
| public: |
| CommandSlowlog() : Commander("slowlog", -2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| subcommand_ = Util::ToLower(args[1]); |
| if (subcommand_ != "reset" && subcommand_ != "get" && subcommand_ != "len") { |
| return Status(Status::NotOK, "SLOWLOG subcommand must be one of RESET, LEN, GET"); |
| } |
| if (subcommand_ == "get" && args.size() >= 3) { |
| if (args[2] == "*") { |
| cnt_ = 0; |
| } else { |
| Status s = Util::StringToNum(args[2], &cnt_); |
| return s; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| auto slowlog = srv->GetSlowLog(); |
| if (subcommand_ == "reset") { |
| slowlog->Reset(); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } else if (subcommand_ == "len") { |
| *output = Redis::Integer(static_cast<int64_t>(slowlog->Size())); |
| return Status::OK(); |
| } else if (subcommand_ == "get") { |
| *output = slowlog->GetLatestEntries(cnt_); |
| return Status::OK(); |
| } |
| return Status(Status::NotOK, "SLOWLOG subcommand must be one of RESET, LEN, GET"); |
| } |
| |
| private: |
| std::string subcommand_; |
| int64_t cnt_ = 10; |
| }; |
| |
| class CommandClient : public Commander { |
| public: |
| CommandClient() : Commander("client", -2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| subcommand_ = Util::ToLower(args[1]); |
| // subcommand: getname id kill list setname |
| if ((subcommand_ == "id" || subcommand_ == "getname" || subcommand_ == "list") && args.size() == 2) { |
| return Status::OK(); |
| } |
| if ((subcommand_ == "setname") && args.size() == 3) { |
| conn_name_ = args[2]; |
| return Status::OK(); |
| } |
| if ((subcommand_ == "kill")) { |
| if (args.size() == 2) { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } else if (args.size() == 3) { |
| addr_ = args[2]; |
| new_format_ = false; |
| return Status::OK(); |
| } |
| |
| uint i = 2; |
| new_format_ = true; |
| while (i < args.size()) { |
| bool moreargs = i < args.size(); |
| if (args[i] == "addr" && moreargs) { |
| addr_ = args[i+1]; |
| } else if (args[i] == "id" && moreargs) { |
| try { |
| id_ = std::stoll(args[i+1]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| } else if (args[i] == "skipme" && moreargs) { |
| if (args[i+1] == "yes") { |
| skipme_ = true; |
| } else if (args[i+1] == "no") { |
| skipme_ = false; |
| } else { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| } else { |
| return Status(Status::RedisParseErr, errInvalidSyntax); |
| } |
| i += 2; |
| } |
| return Status::OK(); |
| } |
| return Status(Status::RedisInvalidCmd, |
| "Syntax error, try CLIENT LIST|KILL ip:port|GETNAME|SETNAME"); |
| } |
| |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| if (subcommand_ == "list") { |
| *output = Redis::BulkString(srv->GetClientsStr()); |
| return Status::OK(); |
| } else if (subcommand_ == "setname") { |
| conn->SetName(conn_name_); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } else if (subcommand_ == "getname") { |
| std::string name = conn->GetName(); |
| *output = name== ""? Redis::NilString(): Redis::BulkString(name); |
| return Status::OK(); |
| } else if (subcommand_ == "id") { |
| *output = Redis::Integer(conn->GetID()); |
| return Status::OK(); |
| } else if (subcommand_ == "kill") { |
| int64_t killed = 0; |
| srv->KillClient(&killed, addr_, id_, skipme_, conn); |
| if (new_format_) { |
| *output = Redis::Integer(killed); |
| } else { |
| if (killed == 0) |
| *output = Redis::Error("No such client"); |
| else |
| *output = Redis::SimpleString("OK"); |
| } |
| return Status::OK(); |
| } |
| |
| return Status(Status::RedisInvalidCmd, |
| "Syntax error, try CLIENT LIST|KILL ip:port|GETNAME|SETNAME"); |
| } |
| |
| private: |
| std::string addr_; |
| std::string conn_name_; |
| std::string subcommand_; |
| bool skipme_ = false; |
| uint64_t id_ = 0; |
| bool new_format_ = true; |
| }; |
| |
| class CommandMonitor : public Commander { |
| public: |
| CommandMonitor() : Commander("monitor", 1, false) {} |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| conn->Owner()->BecomeMonitorConn(conn); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandShutdown : public Commander { |
| public: |
| CommandShutdown() : Commander("shutdown", -1, false) {} |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error(errAdministorPermissionRequired); |
| return Status::OK(); |
| } |
| if (!srv->IsStopped()) { |
| LOG(INFO) << "bye bye"; |
| srv->Stop(); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandQuit : public Commander { |
| public: |
| CommandQuit() : Commander("quit", -1, false) {} |
| Status Execute(Server *srv, Connection *conn, std::string *output) override { |
| conn->EnableFlag(Redis::Connection::kCloseAfterReply); |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandScanBase : public Commander { |
| public: |
| explicit CommandScanBase(const std::string &name, int arity, bool is_write = false) |
| : Commander(name, arity, is_write) {} |
| Status ParseMatchAndCountParam(const std::string &type, std::string value) { |
| if (type == "match") { |
| prefix = std::move(value); |
| if (!prefix.empty() && prefix[prefix.size() - 1] == '*') { |
| prefix = prefix.substr(0, prefix.size() - 1); |
| return Status::OK(); |
| } |
| return Status(Status::RedisParseErr, "only keys prefix match was supported"); |
| } else if (type == "count") { |
| try { |
| limit = std::stoi(value); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "ERR count param should be type int"); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| void ParseCursor(const std::string ¶m) { |
| cursor = param; |
| if (cursor == "0") { |
| cursor = std::string(); |
| } |
| } |
| |
| std::string GenerateOutput(const std::vector<std::string> &keys) { |
| std::vector<std::string> list; |
| if (!keys.empty()) { |
| list.emplace_back(Redis::BulkString(keys.back())); |
| } else { |
| list.emplace_back(Redis::BulkString("0")); |
| } |
| |
| list.emplace_back(Redis::MultiBulkString(keys)); |
| |
| return Redis::Array(list); |
| } |
| |
| protected: |
| std::string cursor; |
| std::string prefix; |
| int limit = 20; |
| }; |
| |
| class CommandSubkeyScanBase : public CommandScanBase { |
| public: |
| explicit CommandSubkeyScanBase(const std::string &name, int arity, bool is_write = false) |
| : CommandScanBase(name, arity, is_write) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 == 0) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| key = args[1]; |
| ParseCursor(args[2]); |
| if (args.size() >= 5) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[3]), args_[4]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| if (args.size() >= 7) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[5]), args_[6]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| return Commander::Parse(args); |
| } |
| |
| std::string GenerateOutput(const std::vector<std::string> &fields, const std::vector<std::string> &values) { |
| std::vector<std::string> list; |
| if (!fields.empty()) { |
| list.emplace_back(Redis::BulkString(fields.back())); |
| } else { |
| list.emplace_back(Redis::BulkString("0")); |
| } |
| std::vector<std::string> fvs; |
| auto items_count = fields.size(); |
| if (items_count > 0) { |
| for (size_t i = 0; i < items_count; i++) { |
| fvs.emplace_back(fields[i]); |
| fvs.emplace_back(values[i]); |
| } |
| list.emplace_back(Redis::MultiBulkString(fvs)); |
| } |
| return Redis::Array(list); |
| } |
| |
| protected: |
| std::string key; |
| }; |
| |
| class CommandScan : public CommandScanBase { |
| public: |
| CommandScan() : CommandScanBase("scan", -2, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 0) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| |
| ParseCursor(args[1]); |
| if (args.size() >= 4) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[2]), args_[3]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| if (args.size() >= 6) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[4]), args_[5]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Database redis_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> keys; |
| auto s = redis_db.Scan(cursor, limit, prefix, &keys); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| *output = GenerateOutput(keys); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandHScan : public CommandSubkeyScanBase { |
| public: |
| CommandHScan() : CommandSubkeyScanBase("hscan", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Hash hash_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> fields; |
| std::vector<std::string> values; |
| auto s = hash_db.Scan(key, cursor, limit, prefix, &fields, &values); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| *output = GenerateOutput(fields, values); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSScan : public CommandSubkeyScanBase { |
| public: |
| CommandSScan() : CommandSubkeyScanBase("sscan", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Set set_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| auto s = set_db.Scan(key, cursor, limit, prefix, &members); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| *output = CommandScanBase::GenerateOutput(members); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandZScan : public CommandSubkeyScanBase { |
| public: |
| CommandZScan() : CommandSubkeyScanBase("zscan", -3, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::ZSet zset_db(svr->storage_, conn->GetNamespace()); |
| std::vector<std::string> members; |
| std::vector<double> scores; |
| auto s = zset_db.Scan(key, cursor, limit, prefix, &members, &scores); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| std::vector<std::string> score_strings; |
| for (const auto &score : scores) { |
| score_strings.emplace_back(std::to_string(score)); |
| } |
| *output = GenerateOutput(members, score_strings); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandRandomKey : public Commander { |
| public: |
| CommandRandomKey() : Commander("randomkey", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| std::string key; |
| auto cursor = svr->GetLastRandomKeyCursor(); |
| Redis::Database redis(svr->storage_, conn->GetNamespace()); |
| redis.RandomKey(cursor, &key); |
| svr->SetLastRandomKeyCursor(key); |
| *output = Redis::BulkString(key); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandReplConf : public Commander { |
| public: |
| CommandReplConf() : Commander("replconf", -3, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 == 0) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| if (args.size() >= 3) { |
| Status s = ParseParam(Util::ToLower(args[1]), args_[2]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| if (args.size() >= 5) { |
| Status s = ParseParam(Util::ToLower(args[3]), args_[4]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status ParseParam(const std::string &option, const std::string &value) { |
| if (option == "listening-port") { |
| try { |
| auto p = std::stoul(value); |
| if (p > UINT32_MAX) { |
| throw std::overflow_error("listening-port out of range"); |
| } |
| port_ = static_cast<uint32_t>(p); |
| } catch (const std::exception &e) { |
| return Status(Status::RedisParseErr, "listening-port should be number"); |
| } |
| } else { |
| return Status(Status::RedisParseErr, "unknown option"); |
| } |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (port_ != 0) { |
| conn->SetListeningPort(port_); |
| } |
| *output = Redis::SimpleString("OK"); |
| return Status::OK(); |
| } |
| |
| private: |
| uint32_t port_ = 0; |
| }; |
| |
| class CommandFetchMeta : public Commander { |
| public: |
| CommandFetchMeta() : Commander("_fetch_meta", 1, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint64_t file_size; |
| rocksdb::BackupID meta_id; |
| int fd; |
| auto s = Engine::Storage::BackupManager::OpenLatestMeta( |
| svr->storage_, &fd, &meta_id, &file_size); |
| if (!s.IsOK()) { |
| LOG(ERROR) << "Failed to open latest meta, err: " << s.Msg(); |
| return Status(Status::DBBackupFileErr, "can't create db backup"); |
| } |
| // Send the meta ID |
| conn->Reply(std::to_string(meta_id) + CRLF); |
| // Send meta file size |
| conn->Reply(std::to_string(file_size) + CRLF); |
| // Send meta content |
| conn->SendFile(fd); |
| svr->stats_.IncrFullSyncCounter(); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandFetchFile : public Commander { |
| public: |
| CommandFetchFile() : Commander("_fetch_file", 2, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| path_ = args[1]; |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint64_t file_size = 0; |
| auto fd = Engine::Storage::BackupManager::OpenDataFile(svr->storage_, path_, |
| &file_size); |
| if (fd < 0) return Status(Status::DBBackupFileErr); |
| conn->Reply(std::to_string(file_size) + CRLF); |
| conn->SendFile(fd); |
| return Status::OK(); |
| } |
| |
| private: |
| std::string path_; |
| }; |
| |
| class CommandDBName : public Commander { |
| public: |
| CommandDBName() : Commander("_db_name", 1, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| return Status::OK(); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| conn->Reply(svr->storage_->GetName() + CRLF); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsInfo : public Commander { |
| public: |
| CommandSlotsInfo() : Commander("slotsinfo", -1, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() > 1) { |
| try { |
| start_ = std::stoi(args[1]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| } |
| if (args.size() > 2) { |
| try { |
| count_ = std::stoi(args[2]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| |
| std::vector<SlotCount> slot_counts; |
| rocksdb::Status s = |
| slot_db.GetInfo(start_, count_, &slot_counts); |
| if (!s.ok()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| output->append(Redis::MultiLen(slot_counts.size())); |
| for (const auto sc : slot_counts) { |
| output->append(Redis::MultiLen(2)); |
| output->append(Redis::Integer(sc.slot_num)); |
| output->append(Redis::Integer(sc.count)); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| int start_ = 0; |
| int count_ = HASH_SLOTS_SIZE - 1; |
| }; |
| |
| class CommandSlotsScan : public CommandScanBase { |
| public: |
| CommandSlotsScan() : CommandScanBase("slotsscan", -3, false) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() % 2 != 1) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| try { |
| slot_num_ = std::stoi(args[1]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| ParseCursor(args[2]); |
| if (args.size() == 5) { |
| Status s = ParseMatchAndCountParam(Util::ToLower(args[3]), args_[4]); |
| if (!s.IsOK()) { |
| return s; |
| } |
| } |
| return Commander::Parse(args); |
| } |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| std::vector<std::string> keys; |
| auto s = slot_db.Scan(slot_num_, cursor, limit, &keys); |
| if (!s.ok() && !s.IsNotFound()) { |
| return Status(Status::RedisExecErr, s.ToString()); |
| } |
| |
| *output = GenerateOutput(keys); |
| return Status::OK(); |
| } |
| |
| private: |
| uint32_t slot_num_ = 0; |
| }; |
| |
| class CommandSlotsDel : public Commander { |
| public: |
| CommandSlotsDel() : Commander("slotsdel", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| std::vector<uint32_t> slot_nums; |
| uint32_t slot_num; |
| for (size_t i = 1; i < args_.size(); i++) { |
| try { |
| slot_num = std::stoi(args_[i]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| slot_nums.emplace_back(slot_num); |
| slot_db.Del(slot_num); |
| } |
| |
| output->append(Redis::MultiLen(slot_nums.size())); |
| for (const auto sn : slot_nums) { |
| output->append(Redis::MultiLen(2)); |
| output->append(Redis::Integer(sn)); |
| output->append(Redis::Integer(0)); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsMgrtBase : public Commander { |
| public: |
| explicit CommandSlotsMgrtBase(const std::string &name, int arity, bool is_write = false) |
| : Commander(name, arity, is_write) {} |
| Status Parse(const std::vector<std::string> &args) override { |
| if (args.size() != static_cast<size_t>(arity_)) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| host = args[1]; |
| try { |
| port = std::stoi(args[2]); |
| timeout = std::stoull(args[3]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| return Commander::Parse(args); |
| } |
| |
| protected: |
| std::string host; |
| uint32_t port = 0; |
| uint64_t timeout = 0; |
| }; |
| |
| class CommandSlotsMgrtSlot : public CommandSlotsMgrtBase { |
| public: |
| CommandSlotsMgrtSlot() : CommandSlotsMgrtBase("slotsmgrtslot", 5, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| |
| uint32_t slot_num; |
| try { |
| slot_num = std::stoi(args_[4]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| auto s = slot_db.MigrateSlotRandomOne(host, port, timeout, slot_num); |
| uint64_t size; |
| auto st = slot_db.Size(slot_num, &size); |
| |
| output->append(Redis::MultiLen(2)); |
| output->append(s.IsOK() ? Redis::Integer(1) : Redis::Integer(0)); |
| output->append(Redis::Integer(size)); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsMgrtOne : public CommandSlotsMgrtBase { |
| public: |
| CommandSlotsMgrtOne() : CommandSlotsMgrtBase("slotsmgrtone", 5, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| |
| auto s = slot_db.MigrateOne(host, port, timeout, args_[4]); |
| *output = s.IsOK() ? Redis::Integer(1) : Redis::Integer(0); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsMgrtTagSlot : public CommandSlotsMgrtBase { |
| public: |
| CommandSlotsMgrtTagSlot() : CommandSlotsMgrtBase("slotsmgrttagslot", 5, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| |
| uint32_t slot_num; |
| try { |
| slot_num = std::stoi(args_[4]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| int ret; |
| auto s = slot_db.MigrateTagSlot(host, port, timeout, slot_num, &ret); |
| uint64_t size; |
| auto st = slot_db.Size(slot_num, &size); |
| |
| output->append(Redis::MultiLen(2)); |
| output->append(s.IsOK() ? Redis::Integer(ret) : Redis::Integer(0)); |
| output->append(Redis::Integer(size)); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsMgrtTagOne : public CommandSlotsMgrtBase { |
| public: |
| CommandSlotsMgrtTagOne() : CommandSlotsMgrtBase("slotsmgrttagone", 5, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| |
| int ret; |
| auto s = slot_db.MigrateTag(host, port, timeout, args_[4], &ret); |
| *output = s.IsOK() ? Redis::Integer(ret) : Redis::Integer(0); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsMgrtTagSlotAsync : public CommandSlotsMgrtBase { |
| public: |
| CommandSlotsMgrtTagSlotAsync() |
| : CommandSlotsMgrtBase("slotsmgrttagslot-async", 8, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| uint32_t slot_num; |
| try { |
| slot_num = std::stoi(args_[6]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| uint32_t key_num; |
| try { |
| key_num = std::stoi(args_[7]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| auto s = svr->slotsmgrt_sender_thread_->SlotsMigrateBatch(host, port, timeout, slot_num, key_num); |
| if (!s.IsOK()) { |
| LOG(WARNING) << "Slot batch migrate keys error"; |
| return Status(Status::RedisExecErr, "Slot batch migrating keys error: " + s.Msg()); |
| } |
| |
| uint64_t moved = 0, remained = 0; |
| s = svr->slotsmgrt_sender_thread_->GetSlotsMigrateResult(&moved, &remained); |
| if (!s.IsOK()) { |
| LOG(WARNING) << "Slot batch migrate keys get result error"; |
| return Status(Status::RedisExecErr, "Slot batch migrating keys get result error"); |
| } |
| output->append(Redis::MultiLen(2)); |
| output->append(Redis::Integer(moved)); |
| output->append(Redis::Integer(remained)); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsMgrtSlotAsync : public CommandSlotsMgrtTagSlotAsync { |
| public: |
| CommandSlotsMgrtSlotAsync() : CommandSlotsMgrtTagSlotAsync() { name_ = "slotsmgrtslot-async"; } |
| }; |
| |
| class CommandSlotsMgrtExecWrapper : public Commander { |
| public: |
| CommandSlotsMgrtExecWrapper() : Commander("slotsmgrt-exec-wrapper", -3, true) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| |
| int ret; |
| auto s = svr->slotsmgrt_sender_thread_->SlotsMigrateOne(args_[1], &ret); |
| output->append(Redis::MultiLen(2)); |
| output->append(Redis::Integer(ret)); |
| output->append(Redis::Integer(ret)); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsMgrtAsyncStatus : public Commander { |
| public: |
| CommandSlotsMgrtAsyncStatus() : Commander("slotsmgrt-async-status", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!svr->GetConfig()->codis_enabled) { |
| return Status(Status::RedisExecErr, "codis is no enabled"); |
| } |
| std::string ip; |
| int port; |
| uint32_t slot_num; |
| bool migrating; |
| uint64_t moved_keys; |
| uint64_t remain_keys; |
| auto s = svr->slotsmgrt_sender_thread_->GetSlotsMgrtSenderStatus(&ip, |
| &port, |
| &slot_num, |
| &migrating, |
| &moved_keys, |
| &remain_keys); |
| |
| std::string migrate_status = migrating ? "yes" : "no"; |
| std::vector<std::string> list; |
| list.emplace_back(Redis::BulkString("host")); |
| list.emplace_back(Redis::BulkString(ip)); |
| list.emplace_back(Redis::BulkString("port")); |
| list.emplace_back(Redis::Integer(port)); |
| list.emplace_back(Redis::BulkString("slot number")); |
| list.emplace_back(Redis::BulkString(std::to_string(slot_num))); |
| list.emplace_back(Redis::BulkString("migrating")); |
| list.emplace_back(Redis::BulkString(migrate_status)); |
| list.emplace_back(Redis::BulkString("moved keys")); |
| list.emplace_back(Redis::BulkString(std::to_string(moved_keys))); |
| list.emplace_back(Redis::BulkString("remain keys")); |
| list.emplace_back(Redis::BulkString(std::to_string(remain_keys))); |
| |
| *output = Redis::MultiBulkString(list); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsMgrtAsyncCancel : public Commander { |
| public: |
| CommandSlotsMgrtAsyncCancel() : Commander("slotsmgrt-async-cancel", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!svr->GetConfig()->codis_enabled) { |
| return Status(Status::RedisExecErr, "codis is no enabled"); |
| } |
| |
| auto s = svr->slotsmgrt_sender_thread_->SlotsMigrateAsyncCancel(); |
| *output = s.IsOK() ? Redis::Integer(1) : Redis::Integer(0); |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsRestore : public Commander { |
| public: |
| CommandSlotsRestore() : Commander("slotsrestore", -4, false) {} |
| |
| Status Parse(const std::vector<std::string> &args) override { |
| if ((args_.size() - 4) % 3 != 0) { |
| return Status(Status::RedisParseErr, errWrongNumOfArguments); |
| } |
| for (unsigned int i = 1; i < args_.size(); i += 3) { |
| int ttl; |
| try { |
| ttl = std::stoi(args_[i + 1]); |
| } catch (std::exception &e) { |
| return Status(Status::RedisParseErr, errValueNotInterger); |
| } |
| key_values_.push_back(KeyValue{args_[i], ttl, args_[i + 2]}); |
| } |
| return Commander::Parse(args); |
| } |
| |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| auto s = slot_db.Restore(key_values_); |
| if (!s.ok()) { |
| *output = Redis::Error(s.ToString()); |
| } else { |
| *output = Redis::SimpleString("OK"); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| std::vector<KeyValue> key_values_; |
| }; |
| |
| class CommandSlotsHashKey : public Commander { |
| public: |
| CommandSlotsHashKey() : Commander("slotshashkey", -2, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| Redis::Slot slot_db(svr->storage_); |
| std::vector<uint32_t> slot_nums; |
| for (size_t i = 1; i < args_.size(); i++) { |
| auto slot_num = GetSlotNumFromKey(args_[i]); |
| slot_nums.emplace_back(slot_num); |
| } |
| output->append(Redis::MultiLen(slot_nums.size())); |
| for (const auto slot_num : slot_nums) { |
| output->append(Redis::Integer(slot_num)); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| class CommandSlotsCheck : public Commander { |
| public: |
| CommandSlotsCheck() : Commander("slotscheck", 1, false) {} |
| Status Execute(Server *svr, Connection *conn, std::string *output) override { |
| if (!conn->IsAdmin()) { |
| *output = Redis::Error(errAdministorPermissionRequired); |
| return Status::OK(); |
| } |
| Redis::Slot slot_db(svr->storage_); |
| auto s = slot_db.Check(); |
| if (!s.ok()) { |
| *output = Redis::Error(s.ToString()); |
| } else { |
| *output = Redis::SimpleString("OK"); |
| } |
| return Status::OK(); |
| } |
| }; |
| |
| #define ADD_CMD(name, fn) \ |
| {name, []() -> std::unique_ptr<Commander> { \ |
| return std::unique_ptr<Commander>(new fn()); \ |
| }} |
| |
| using CommanderFactory = std::function<std::unique_ptr<Commander>()>; |
| std::map<std::string, CommanderFactory> command_table = { |
| ADD_CMD("auth", CommandAuth), |
| ADD_CMD("ping", CommandPing), |
| ADD_CMD("select", CommandSelect), |
| ADD_CMD("info", CommandInfo), |
| ADD_CMD("config", CommandConfig), |
| ADD_CMD("namespace", CommandNamespace), |
| ADD_CMD("keys", CommandKeys), |
| ADD_CMD("flushdb", CommandFlushDB), |
| ADD_CMD("flushall", CommandFlushAll), |
| ADD_CMD("dbsize", CommandDBSize), |
| ADD_CMD("slowlog", CommandSlowlog), |
| ADD_CMD("perflog", CommandPerfLog), |
| ADD_CMD("client", CommandClient), |
| ADD_CMD("monitor", CommandMonitor), |
| ADD_CMD("shutdown", CommandShutdown), |
| ADD_CMD("quit", CommandQuit), |
| ADD_CMD("scan", CommandScan), |
| ADD_CMD("randomkey", CommandRandomKey), |
| |
| // key command |
| ADD_CMD("ttl", CommandTTL), |
| ADD_CMD("pttl", CommandPTTL), |
| ADD_CMD("type", CommandType), |
| ADD_CMD("object", CommandObject), |
| ADD_CMD("exists", CommandExists), |
| ADD_CMD("persist", CommandPersist), |
| ADD_CMD("expire", CommandExpire), |
| ADD_CMD("pexpire", CommandPExpire), |
| ADD_CMD("expireat", CommandExpireAt), |
| ADD_CMD("pexpireat", CommandPExpireAt), |
| ADD_CMD("del", CommandDel), |
| |
| // string command |
| ADD_CMD("get", CommandGet), |
| ADD_CMD("strlen", CommandStrlen), |
| ADD_CMD("getset", CommandGetSet), |
| ADD_CMD("getrange", CommandGetRange), |
| ADD_CMD("setrange", CommandSetRange), |
| ADD_CMD("mget", CommandMGet), |
| ADD_CMD("append", CommandAppend), |
| ADD_CMD("set", CommandSet), |
| ADD_CMD("setex", CommandSetEX), |
| ADD_CMD("setnx", CommandSetNX), |
| ADD_CMD("mset", CommandMSet), |
| ADD_CMD("incrby", CommandIncrBy), |
| ADD_CMD("incrbyfloat", CommandIncrByFloat), |
| ADD_CMD("incr", CommandIncr), |
| ADD_CMD("decrby", CommandDecrBy), |
| ADD_CMD("decr", CommandDecr), |
| |
| // bit command |
| ADD_CMD("getbit", CommandGetBit), |
| ADD_CMD("setbit", CommandSetBit), |
| ADD_CMD("msetbit", CommandMSetBit), |
| ADD_CMD("bitcount", CommandBitCount), |
| ADD_CMD("bitpos", CommandBitPos), |
| |
| // hash command |
| ADD_CMD("hget", CommandHGet), |
| ADD_CMD("hincrby", CommandHIncrBy), |
| ADD_CMD("hincrbyfloat", CommandHIncrByFloat), |
| ADD_CMD("hset", CommandHSet), |
| ADD_CMD("hsetnx", CommandHSetNX), |
| ADD_CMD("hdel", CommandHDel), |
| ADD_CMD("hstrlen", CommandHStrlen), |
| ADD_CMD("hexists", CommandHExists), |
| ADD_CMD("hlen", CommandHLen), |
| ADD_CMD("hmget", CommandHMGet), |
| ADD_CMD("hmset", CommandHMSet), |
| ADD_CMD("hkeys", CommandHKeys), |
| ADD_CMD("hvals", CommandHVals), |
| ADD_CMD("hgetall", CommandHGetAll), |
| ADD_CMD("hscan", CommandHScan), |
| |
| // list command |
| ADD_CMD("lpush", CommandLPush), |
| ADD_CMD("rpush", CommandRPush), |
| ADD_CMD("lpushx", CommandLPushX), |
| ADD_CMD("rpushx", CommandRPushX), |
| ADD_CMD("lpop", CommandLPop), |
| ADD_CMD("rpop", CommandRPop), |
| ADD_CMD("blpop", CommandBLPop), |
| ADD_CMD("brpop", CommandBRPop), |
| ADD_CMD("lrem", CommandLRem), |
| ADD_CMD("linsert", CommandLInsert), |
| ADD_CMD("lrange", CommandLRange), |
| ADD_CMD("lindex", CommandLIndex), |
| ADD_CMD("ltrim", CommandLTrim), |
| ADD_CMD("llen", CommandLLen), |
| ADD_CMD("lset", CommandLSet), |
| ADD_CMD("rpoplpush", CommandRPopLPUSH), |
| |
| // set command |
| ADD_CMD("sadd", CommandSAdd), |
| ADD_CMD("srem", CommandSRem), |
| ADD_CMD("scard", CommandSCard), |
| ADD_CMD("smembers", CommandSMembers), |
| ADD_CMD("sismember", CommandSIsMember), |
| ADD_CMD("spop", CommandSPop), |
| ADD_CMD("srandmember", CommandSRandMember), |
| ADD_CMD("smove", CommandSMove), |
| ADD_CMD("sdiff", CommandSDiff), |
| ADD_CMD("sunion", CommandSUnion), |
| ADD_CMD("sinter", CommandSInter), |
| ADD_CMD("sdiffstore", CommandSDiffStore), |
| ADD_CMD("sunionstore", CommandSUnionStore), |
| ADD_CMD("sinterstore", CommandSInterStore), |
| ADD_CMD("sscan", CommandSScan), |
| |
| // zset command |
| ADD_CMD("zadd", CommandZAdd), |
| ADD_CMD("zcard", CommandZCard), |
| ADD_CMD("zcount", CommandZCount), |
| ADD_CMD("zincrby", CommandZIncrBy), |
| ADD_CMD("zinterstore", CommandZInterStore), |
| ADD_CMD("zlexcount", CommandZLexCount), |
| ADD_CMD("zpopmax", CommandZPopMax), |
| ADD_CMD("zpopmin", CommandZPopMin), |
| ADD_CMD("zrange", CommandZRange), |
| ADD_CMD("zrevrange", CommandZRevRange), |
| ADD_CMD("zrangebylex", CommandZRangeByLex), |
| ADD_CMD("zrangebyscore", CommandZRangeByScore), |
| ADD_CMD("zrank", CommandZRank), |
| ADD_CMD("zrem", CommandZRem), |
| ADD_CMD("zremrangebyrank", CommandZRemRangeByRank), |
| ADD_CMD("zremrangebyscore", CommandZRemRangeByScore), |
| ADD_CMD("zremrangebylex", CommandZRemRangeByLex), |
| ADD_CMD("zrevrangebyscore", CommandZRevRangeByScore), |
| ADD_CMD("zrevrank", CommandZRevRank), |
| ADD_CMD("zscore", CommandZScore), |
| ADD_CMD("zscan", CommandZScan), |
| ADD_CMD("zunionstore", CommandZUnionStore), |
| |
| // geo command |
| ADD_CMD("geoadd", CommandGeoAdd), |
| ADD_CMD("geodist", CommandGeoDist), |
| ADD_CMD("geohash", CommandGeoHash), |
| ADD_CMD("geopos", CommandGeoPos), |
| ADD_CMD("georadius", CommandGeoRadius), |
| ADD_CMD("georadiusbymember", CommandGeoRadiusByMember), |
| ADD_CMD("georadius_ro", CommandGeoRadiusReadonly), |
| ADD_CMD("georadiusbymember_ro", CommandGeoRadiusByMemberReadonly), |
| |
| // pub/sub command |
| ADD_CMD("publish", CommandPublish), |
| ADD_CMD("subscribe", CommandSubscribe), |
| ADD_CMD("unsubscribe", CommandUnSubscribe), |
| ADD_CMD("psubscribe", CommandPSubscribe), |
| ADD_CMD("punsubscribe", CommandPUnSubscribe), |
| ADD_CMD("pubsub", CommandPubSub), |
| |
| // Sortedint command |
| ADD_CMD("siadd", CommandSortedintAdd), |
| ADD_CMD("sirem", CommandSortedintRem), |
| ADD_CMD("sicard", CommandSortedintCard), |
| ADD_CMD("sirange", CommandSortedintRange), |
| ADD_CMD("sirevrange", CommandSortedintRevRange), |
| |
| // Codis Slot command |
| ADD_CMD("slotsinfo", CommandSlotsInfo), |
| ADD_CMD("slotsscan", CommandSlotsScan), |
| ADD_CMD("slotsdel", CommandSlotsDel), |
| ADD_CMD("slotsmgrtslot", CommandSlotsMgrtSlot), |
| ADD_CMD("slotsmgrtone", CommandSlotsMgrtOne), |
| ADD_CMD("slotsmgrttagslot", CommandSlotsMgrtTagSlot), |
| ADD_CMD("slotsmgrttagone", CommandSlotsMgrtTagOne), |
| ADD_CMD("slotsrestore", CommandSlotsRestore), |
| ADD_CMD("slotshashkey", CommandSlotsHashKey), |
| ADD_CMD("slotscheck", CommandSlotsCheck), |
| ADD_CMD("slotsmgrtslot-async", CommandSlotsMgrtSlotAsync), |
| ADD_CMD("slotsmgrttagslot-async", CommandSlotsMgrtTagSlotAsync), |
| ADD_CMD("slotsmgrt-exec-wrapper", CommandSlotsMgrtExecWrapper), |
| ADD_CMD("slotsmgrt-async-status", CommandSlotsMgrtAsyncStatus), |
| ADD_CMD("slotsmgrt-async-cancel", CommandSlotsMgrtAsyncCancel), |
| |
| // internal management cmd |
| ADD_CMD("compact", CommandCompact), |
| ADD_CMD("bgsave", CommandBGSave), |
| ADD_CMD("slaveof", CommandSlaveOf), |
| ADD_CMD("stats", CommandStats), |
| }; |
| |
| // Replication related commands, which are received by workers listening on |
| // `repl-port` |
| std::map<std::string, CommanderFactory> repl_command_table = { |
| ADD_CMD("auth", CommandAuth), |
| ADD_CMD("replconf", CommandReplConf), |
| ADD_CMD("psync", CommandPSync), |
| ADD_CMD("_fetch_meta", CommandFetchMeta), |
| ADD_CMD("_fetch_file", CommandFetchFile), |
| ADD_CMD("_db_name", CommandDBName), |
| }; |
| |
| Status LookupCommand(const std::string &cmd_name, |
| std::unique_ptr<Commander> *cmd, bool is_repl) { |
| if (cmd_name.empty()) return Status(Status::RedisUnknownCmd); |
| if (is_repl) { |
| auto cmd_factory = repl_command_table.find(Util::ToLower(cmd_name)); |
| if (cmd_factory == repl_command_table.end()) { |
| return Status(Status::RedisUnknownCmd); |
| } |
| *cmd = cmd_factory->second(); |
| } else { |
| auto cmd_factory = command_table.find(Util::ToLower(cmd_name)); |
| if (cmd_factory == command_table.end()) { |
| return Status(Status::RedisUnknownCmd); |
| } |
| *cmd = cmd_factory->second(); |
| } |
| return Status::OK(); |
| } |
| |
| bool IsCommandExists(const std::string &cmd) { |
| return command_table.find(cmd) != command_table.end(); |
| } |
| |
| void GetCommandList(std::vector<std::string> *cmds) { |
| cmds->clear(); |
| for (const auto &cmd : command_table) { |
| cmds->emplace_back(cmd.first); |
| } |
| for (const auto &cmd : repl_command_table) { |
| cmds->emplace_back(cmd.first); |
| } |
| } |
| } // namespace Redis |