blob: 65c05ac2eb7c90956affcf207f0da7442e4cd2e5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "redis_parser.h"
#include <rocksdb/status.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_other_types.h>
#include <dsn/utility/string_conv.h>
#include <rrdb/rrdb.client.h>
#include <pegasus/error.h>
#include <pegasus_key_schema.h>
#include <pegasus_utils.h>
#include "base/pegasus_const.h"
namespace pegasus {
namespace proxy {
std::atomic_llong redis_parser::s_next_seqid(0);
const char redis_parser::CR = '\015';
const char redis_parser::LF = '\012';
std::unordered_map<std::string, redis_parser::redis_call_handler> redis_parser::s_dispatcher = {
{"SET", redis_parser::g_set},
{"GET", redis_parser::g_get},
{"DEL", redis_parser::g_del},
{"SETEX", redis_parser::g_setex},
{"TTL", redis_parser::g_ttl},
{"PTTL", redis_parser::g_ttl},
{"GEOADD", redis_parser::g_geo_add},
{"GEODIST", redis_parser::g_geo_dist},
{"GEOPOS", redis_parser::g_geo_pos},
{"GEORADIUS", redis_parser::g_geo_radius},
{"GEORADIUSBYMEMBER", redis_parser::g_geo_radius_by_member},
{"INCR", redis_parser::g_incr},
{"INCRBY", redis_parser::g_incr_by},
{"DECR", redis_parser::g_decr},
{"DECRBY", redis_parser::g_decr_by},
};
redis_parser::redis_call_handler redis_parser::get_handler(const char *command, unsigned int length)
{
std::string key(command, length);
std::transform(key.begin(), key.end(), key.begin(), toupper);
auto iter = s_dispatcher.find(key);
if (iter == s_dispatcher.end())
return redis_parser::g_default_handler;
return iter->second;
}
redis_parser::redis_parser(proxy_stub *op, dsn::message_ex *first_msg)
: proxy_session(op, first_msg),
_current_msg(new message_entry()),
_status(kStartArray),
_current_size(),
_total_length(0),
_current_buffer(nullptr),
_current_buffer_length(0),
_current_cursor(0)
{
::dsn::apps::rrdb_client *r;
if (op) {
std::vector<dsn::rpc_address> meta_list;
dsn::replication::replica_helper::load_meta_servers(
meta_list, PEGASUS_CLUSTER_SECTION_NAME.c_str(), op->get_cluster());
r = new ::dsn::apps::rrdb_client(op->get_cluster(), meta_list, op->get_app());
if (strlen(op->get_geo_app()) != 0) {
_geo_client = dsn::make_unique<geo::geo_client>(
"config.ini", op->get_cluster(), op->get_app(), op->get_geo_app());
}
} else {
r = new ::dsn::apps::rrdb_client();
}
client.reset(r);
}
redis_parser::~redis_parser()
{
clear_reply_queue();
reset_parser();
ddebug("%s: redis parser destroyed", _remote_address.to_string());
}
void redis_parser::prepare_current_buffer()
{
void *msg_buffer;
if (_current_buffer == nullptr) {
dsn::message_ex *first_msg = _recv_buffers.front();
dassert(
first_msg->read_next(&msg_buffer, &_current_buffer_length),
"read dsn::message_ex* failed, msg from_address = %s, to_address = %s, rpc_name = %s",
first_msg->header->from_address.to_string(),
first_msg->to_address.to_string(),
first_msg->header->rpc_name);
_current_buffer = reinterpret_cast<char *>(msg_buffer);
_current_cursor = 0;
} else if (_current_cursor >= _current_buffer_length) {
dsn::message_ex *first_msg = _recv_buffers.front();
first_msg->read_commit(_current_buffer_length);
if (first_msg->read_next(&msg_buffer, &_current_buffer_length)) {
_current_buffer = reinterpret_cast<char *>(msg_buffer);
_current_cursor = 0;
} else {
// we have consume this message all over
// reference is added in append message
first_msg->release_ref();
_recv_buffers.pop();
_current_buffer = nullptr;
prepare_current_buffer();
}
}
}
void redis_parser::reset_parser()
{
// clear the parser status
_current_msg->request.sub_request_count = 0;
_current_msg->request.sub_requests.clear();
_status = kStartArray;
_current_size.clear();
// clear the data stream
_total_length = 0;
if (_current_buffer) {
_recv_buffers.front()->read_commit(_current_buffer_length);
}
_current_buffer = nullptr;
_current_buffer_length = 0;
_current_cursor = 0;
while (!_recv_buffers.empty()) {
_recv_buffers.front()->release_ref();
_recv_buffers.pop();
}
}
char redis_parser::peek()
{
prepare_current_buffer();
return _current_buffer[_current_cursor];
}
bool redis_parser::eat(char c)
{
if (dsn_likely(peek() == c)) {
++_current_cursor;
--_total_length;
return true;
} else {
derror_f("{}: expect token: {}, got {}", _remote_address.to_string(), c, peek());
return false;
}
}
void redis_parser::eat_all(char *dest, size_t length)
{
_total_length -= length;
while (length > 0) {
prepare_current_buffer();
size_t eat_size = _current_buffer_length - _current_cursor;
if (eat_size > length) {
eat_size = length;
}
memcpy(dest, _current_buffer + _current_cursor, eat_size);
dest += eat_size;
_current_cursor += eat_size;
length -= eat_size;
}
}
bool redis_parser::end_array_size()
{
int32_t count = 0;
if (dsn_unlikely(!dsn::buf2int32(dsn::string_view(_current_size), count))) {
derror_f(
"{}: invalid size string \"{}\"", _remote_address.to_string(), _current_size.c_str());
return false;
}
if (dsn_unlikely(count <= 0)) {
derror_f("{}: array size should be positive in redis request, but got {}",
_remote_address.to_string(),
count);
return false;
}
redis_request &current_request = _current_msg->request;
current_request.sub_request_count = count;
current_request.sub_requests.reserve(count);
_current_size.clear();
_status = kStartBulkString;
return true;
}
void redis_parser::append_current_bulk_string()
{
redis_request &current_array = _current_msg->request;
current_array.sub_requests.push_back(_current_str);
if (current_array.sub_requests.size() == current_array.sub_request_count) {
// we get a full request command
handle_command(std::move(_current_msg));
_current_msg.reset(new message_entry());
_status = kStartArray;
} else {
_status = kStartBulkString;
}
}
bool redis_parser::end_bulk_string_size()
{
int32_t length = 0;
if (dsn_unlikely(!dsn::buf2int32(
dsn::string_view(_current_size.c_str(), _current_size.length()), length))) {
derror_f(
"{}: invalid size string \"{}\"", _remote_address.to_string(), _current_size.c_str());
return false;
}
_current_str.length = length;
_current_str.data.assign(nullptr, 0, 0);
_current_size.clear();
if (-1 == _current_str.length) {
append_current_bulk_string();
return true;
}
if (_current_str.length >= 0) {
_status = kStartBulkStringData;
return true;
}
derror_f(
"{}: invalid bulk string length: {}", _remote_address.to_string(), _current_str.length);
return false;
}
void redis_parser::append_message(dsn::message_ex *msg)
{
msg->add_ref();
_recv_buffers.push(msg);
_total_length += msg->body_size();
dinfo_f(
"{}: recv message, currently total length: {}", _remote_address.to_string(), _total_length);
}
// refererence: http://redis.io/topics/protocol
bool redis_parser::parse_stream()
{
char t;
while (_total_length > 0) {
switch (_status) {
case kStartArray:
dverify(eat('*'));
_status = kInArraySize;
break;
case kStartBulkString:
dverify(eat('$'));
_status = kInBulkStringSize;
break;
case kInArraySize:
case kInBulkStringSize:
t = peek();
if (t == CR) {
if (_total_length > 1) {
dverify(eat(CR));
dverify(eat(LF));
if (kInArraySize == _status) {
dverify(end_array_size());
} else {
dverify(end_bulk_string_size());
}
} else {
return true;
}
} else {
_current_size.push_back(t);
dverify(eat(t));
}
break;
case kStartBulkStringData:
// string content + CR + LF
if (_total_length >= _current_str.length + 2) {
if (_current_str.length > 0) {
std::string str_data(_current_str.length, '\0');
eat_all(const_cast<char *>(str_data.data()), _current_str.length);
_current_str.data = dsn::blob::create_from_bytes(std::move(str_data));
}
dverify(eat(CR));
dverify(eat(LF));
append_current_bulk_string();
} else {
return true;
}
break;
default:
break;
}
}
return true;
}
bool redis_parser::parse(dsn::message_ex *msg)
{
append_message(msg);
if (parse_stream()) {
return true;
} else {
// when parse a new message failed, we only reset the parser.
// for pending responses msg queue, we should keep it as it is.
reset_parser();
return false;
}
}
void redis_parser::enqueue_pending_response(std::unique_ptr<message_entry> &&entry)
{
dsn::zauto_lock l(response_lock);
pending_response.emplace_back(std::move(entry));
}
void redis_parser::fetch_and_dequeue_messages(std::vector<dsn::message_ex *> &msgs,
bool only_ready_ones)
{
dsn::zauto_lock l(response_lock);
while (!pending_response.empty()) {
message_entry *entry = pending_response.front().get();
dsn::message_ex *r = entry->response.load(std::memory_order_acquire);
if (only_ready_ones && r == nullptr) {
break;
} else {
msgs.push_back(r);
pending_response.pop_front();
}
}
}
void redis_parser::clear_reply_queue()
{
// clear the response pipeline
std::vector<dsn::message_ex *> all_responses;
fetch_and_dequeue_messages(all_responses, false);
for (dsn::message_ex *m : all_responses) {
if (m != nullptr) {
m->release_ref();
}
}
}
void redis_parser::reply_all_ready()
{
std::vector<dsn::message_ex *> ready_responses;
fetch_and_dequeue_messages(ready_responses, true);
for (dsn::message_ex *m : ready_responses) {
dassert(m != nullptr, "");
dsn_rpc_reply(m, ::dsn::ERR_OK);
// added when message is created
m->release_ref();
}
}
std::shared_ptr<redis_parser::redis_bulk_string> redis_parser::construct_bulk_string(double data)
{
std::string data_str(std::to_string(data));
std::shared_ptr<char> buf = dsn::utils::make_shared_array<char>(data_str.size());
memcpy(buf.get(), data_str.data(), data_str.size());
return std::make_shared<redis_bulk_string>(dsn::blob(std::move(buf), (int)data_str.size()));
}
void redis_parser::simple_ok_reply(message_entry &entry)
{
simple_string_reply(entry, false, std::move("OK"));
}
void redis_parser::simple_error_reply(message_entry &entry, const std::string &message)
{
simple_string_reply(entry, true, std::move(std::string("ERR ").append(message)));
}
void redis_parser::simple_string_reply(message_entry &entry, bool is_error, std::string message)
{
reply_message(entry, redis_simple_string(is_error, std::move(message)));
}
void redis_parser::simple_integer_reply(message_entry &entry, int64_t value)
{
reply_message(entry, redis_integer(value));
}
void redis_parser::default_handler(redis_parser::message_entry &entry)
{
::dsn::blob &cmd = entry.request.sub_requests[0].data;
std::string message = "unknown command '" + std::string(cmd.data(), cmd.length()) + "'";
ddebug_f("{}: {} with seqid {}", _remote_address.to_string(), message, entry.sequence_id);
simple_error_reply(entry, message);
}
void redis_parser::set(redis_parser::message_entry &entry)
{
if (_geo_client == nullptr) {
set_internal(entry);
} else {
set_geo_internal(entry);
}
}
void redis_parser::set_internal(redis_parser::message_entry &entry)
{
redis_request &request = entry.request;
if (request.sub_requests.size() < 3) {
ddebug("%s: set command with invalid arguments, seqid(%" PRId64 ")",
_remote_address.to_string(),
entry.sequence_id);
simple_error_reply(entry, "wrong number of arguments for 'set' command");
} else {
int ttl_seconds = 0;
parse_set_parameters(request.sub_requests, ttl_seconds);
// with a reference to prevent the object from being destroyed
std::shared_ptr<proxy_session> ref_this = shared_from_this();
dinfo("%s: send set command(%" PRId64 ")", _remote_address.to_string(), entry.sequence_id);
auto on_set_reply = [ref_this, this, &entry](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
// when the "is_session_reset" flag is set, the socket may be broken.
// so continue to reply the message is not necessary
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: set command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
// the message_enry "entry" is stored in the queue "pending_response".
// please ensure that "entry" hasn't been released right now.
//
// currently we only clear an entry when it is replied or
// in the redis_parser's destructor
dinfo("%s: set command seqid(%" PRId64 ") got reply",
_remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: set command seqid(%" PRId64 ") got reply with error = %s",
_remote_address.to_string(),
entry.sequence_id,
ec.to_string());
simple_error_reply(entry, ec.to_string());
} else {
::dsn::apps::update_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
simple_error_reply(entry,
"internal error " + std::to_string(rrdb_response.error));
} else {
simple_ok_reply(entry);
}
}
};
::dsn::apps::update_request req;
::dsn::blob null_blob;
pegasus_generate_key(req.key, request.sub_requests[1].data, null_blob);
req.value = request.sub_requests[2].data;
if (ttl_seconds == 0)
req.expire_ts_seconds = 0;
else
req.expire_ts_seconds = ttl_seconds + utils::epoch_now();
auto partition_hash = pegasus_key_hash(req.key);
// TODO: set the timeout
client->put(req, on_set_reply, std::chrono::milliseconds(2000), 0, partition_hash);
}
}
// origin command format:
// SET key value [EX seconds] [PX milliseconds] [NX|XX]
// NOTE: only 'EX' option is supported
void redis_parser::set_geo_internal(message_entry &entry)
{
if (!_geo_client) {
return simple_error_reply(entry, "redis proxy is not on GEO mode");
}
redis_request &redis_request = entry.request;
if (redis_request.sub_requests.size() < 3) {
simple_error_reply(entry, "wrong number of arguments for 'SET' command");
} else {
int ttl_seconds = 0;
parse_set_parameters(redis_request.sub_requests, ttl_seconds);
// with a reference to prevent the object from being destroyed
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto set_callback = [ref_this, this, &entry](int ec, pegasus_client::internal_info &&) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (PERR_OK != ec) {
simple_error_reply(entry, _geo_client->get_error_string(ec));
} else {
simple_ok_reply(entry);
}
};
_geo_client->async_set(redis_request.sub_requests[1].data.to_string(), // key => hash_key
std::string(), // "" => sort_key
redis_request.sub_requests[2].data.to_string(), // value
set_callback,
2000, // TODO: set the timeout
ttl_seconds);
}
}
void redis_parser::setex(message_entry &entry)
{
redis_request &redis_req = entry.request;
// setex key ttl_SECONDS value
if (redis_req.sub_requests.size() != 4) {
ddebug("%s: setex command seqid(%" PRId64 ") with invalid arguments",
_remote_address.to_string(),
entry.sequence_id);
simple_error_reply(entry, "wrong number of arguments for 'setex' command");
} else {
dinfo("%s: send setex command seqid(%" PRId64 ")",
_remote_address.to_string(),
entry.sequence_id);
::dsn::blob &ttl_blob = redis_req.sub_requests[2].data;
int ttl_seconds;
if (!dsn::buf2int32(ttl_blob, ttl_seconds)) {
simple_error_reply(entry, "value is not an integer or out of range");
return;
}
if (ttl_seconds <= 0) {
simple_error_reply(entry, "invalid expire time in setex");
return;
}
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_setex_reply = [ref_this, this, &entry](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (::dsn::ERR_OK != ec) {
ddebug("%s: setex command seqid(%" PRId64 ") got reply with error = %s",
_remote_address.to_string(),
entry.sequence_id,
ec.to_string());
simple_error_reply(entry, ec.to_string());
return;
}
::dsn::apps::update_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
simple_error_reply(entry, "internal error " + std::to_string(rrdb_response.error));
return;
}
simple_ok_reply(entry);
};
::dsn::apps::update_request req;
::dsn::blob null_blob;
pegasus_generate_key(req.key, redis_req.sub_requests[1].data, null_blob);
req.value = redis_req.sub_requests[3].data;
req.expire_ts_seconds = pegasus::utils::epoch_now() + ttl_seconds;
auto partition_hash = pegasus_key_hash(req.key);
// TODO: set the timeout
client->put(req, on_setex_reply, std::chrono::milliseconds(2000), 0, partition_hash);
}
}
void redis_parser::get(message_entry &entry)
{
redis_request &redis_req = entry.request;
if (redis_req.sub_requests.size() != 2) {
ddebug("%s: get command seqid(%" PRId64 ") with invalid arguments",
_remote_address.to_string(),
entry.sequence_id);
simple_error_reply(entry, "wrong number of arguments for 'get' command");
} else {
dinfo("%s: send get command seqid(%" PRId64 ")",
_remote_address.to_string(),
entry.sequence_id);
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_get_reply = [ref_this, this, &entry](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: get command(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (::dsn::ERR_OK != ec) {
ddebug("%s: get command seqid(%" PRId64 ") got reply with error = %s",
_remote_address.to_string(),
entry.sequence_id,
ec.to_string());
simple_error_reply(entry, ec.to_string());
} else {
::dsn::apps::read_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
if (rrdb_response.error == rocksdb::Status::kNotFound) {
reply_message(entry, redis_bulk_string());
} else {
simple_error_reply(entry,
"internal error " + std::to_string(rrdb_response.error));
}
} else {
reply_message(entry, redis_bulk_string(rrdb_response.value));
}
}
};
::dsn::blob req;
::dsn::blob null_blob;
pegasus_generate_key(req, redis_req.sub_requests[1].data, null_blob);
auto partition_hash = pegasus_key_hash(req);
// TODO: set the timeout
client->get(req, on_get_reply, std::chrono::milliseconds(2000), 0, partition_hash);
}
}
void redis_parser::del(message_entry &entry)
{
if (_geo_client == nullptr) {
del_internal(entry);
} else {
del_geo_internal(entry);
}
}
void redis_parser::del_internal(message_entry &entry)
{
redis_request &redis_req = entry.request;
if (redis_req.sub_requests.size() != 2) {
ddebug("%s: del command seqid(%" PRId64 ") with invalid arguments",
_remote_address.to_string(),
entry.sequence_id);
simple_error_reply(entry, "wrong number of arguments for 'del' command");
} else {
dinfo("%s: send del command seqid(%" PRId64 ")",
_remote_address.to_string(),
entry.sequence_id);
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_del_reply = [ref_this, this, &entry](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: del command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
dinfo("%s: del command seqid(%" PRId64 ") got reply",
_remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: del command seqid(%" PRId64 ") got reply with error = %s",
_remote_address.to_string(),
entry.sequence_id,
ec.to_string());
simple_error_reply(entry, ec.to_string());
} else {
::dsn::apps::read_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
simple_error_reply(entry,
"internal error " + std::to_string(rrdb_response.error));
} else {
// NOTE: Deleting a non-existed key returns 1 too. But standard Redis returns 0.
// Pegasus behaves
// differently in this case intentionally for performance.
// Because if we need to check the existence, we should use check_and_mutate
// instead.
simple_integer_reply(entry, 1);
}
}
};
::dsn::blob req;
::dsn::blob null_blob;
pegasus_generate_key(req, redis_req.sub_requests[1].data, null_blob);
auto partition_hash = pegasus_key_hash(req);
// TODO: set the timeout
client->remove(req, on_del_reply, std::chrono::milliseconds(2000), 0, partition_hash);
}
}
// origin command format:
// DEL key [key ...]
// NOTE: only one key is supported
void redis_parser::del_geo_internal(message_entry &entry)
{
if (!_geo_client) {
return simple_error_reply(entry, "redis proxy is not on GEO mode");
}
redis_request &redis_request = entry.request;
if (redis_request.sub_requests.size() != 2) {
simple_error_reply(entry, "wrong number of arguments for 'DEL' command");
} else {
// with a reference to prevent the object from being destroyed
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto del_callback = [ref_this, this, &entry](int ec, pegasus_client::internal_info &&) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (PERR_OK != ec) {
simple_error_reply(entry, _geo_client->get_error_string(ec));
} else {
simple_integer_reply(entry, 1);
}
};
_geo_client->async_del(redis_request.sub_requests[1].data.to_string(), // key => hash_key
std::string(), // "" => sort_key
false,
del_callback,
2000); // TODO: set the timeout
}
}
// process 'ttl' and 'pttl'
void redis_parser::ttl(message_entry &entry)
{
redis_request &redis_req = entry.request;
bool is_ttl = (toupper(redis_req.sub_requests[0].data.data()[0]) == 'T');
if (redis_req.sub_requests.size() != 2) {
ddebug("%s: ttl/pttl command seqid(%" PRId64 ") with invalid arguments",
_remote_address.to_string(),
entry.sequence_id);
simple_error_reply(
entry, fmt::format("wrong number of arguments for '{}'", is_ttl ? "ttl" : "pttl"));
} else {
dinfo("%s: send pttl/ttl command seqid(%" PRId64 ")",
_remote_address.to_string(),
entry.sequence_id);
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_ttl_reply = [ref_this, this, &entry, is_ttl](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: ttl/pttl command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
dinfo("%s: ttl/pttl command seqid(%" PRId64 ") got reply",
_remote_address.to_string(),
entry.sequence_id);
if (::dsn::ERR_OK != ec) {
ddebug("%s: del command seqid(%" PRId64 ") got reply with error = %s",
_remote_address.to_string(),
entry.sequence_id,
ec.to_string());
simple_error_reply(entry, ec.to_string());
} else {
::dsn::apps::ttl_response rrdb_response;
::dsn::unmarshall(response, rrdb_response);
if (rrdb_response.error != 0) {
if (rrdb_response.error == rocksdb::Status::kNotFound) {
simple_integer_reply(entry, -2);
} else {
simple_error_reply(entry,
"internal error " + std::to_string(rrdb_response.error));
}
} else {
simple_integer_reply(entry,
is_ttl ? rrdb_response.ttl_seconds
: rrdb_response.ttl_seconds * 1000);
}
}
};
::dsn::blob req;
::dsn::blob null_blob;
pegasus_generate_key(req, redis_req.sub_requests[1].data, null_blob);
auto partition_hash = pegasus_key_hash(req);
// TODO: set the timeout
client->ttl(req, on_ttl_reply, std::chrono::milliseconds(2000), 0, partition_hash);
}
}
// command format:
// GEORADIUS key longitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT
// count] [ASC|DESC] [STORE key] [STOREDIST key]
// NOTE: [STORE key] [STOREDIST key] are not supported
// NOTE: [WITHHASH] will return origin value of member when enabled
// NOTE: we use SET instead of GEOADD to insert data into pegasus, so there is not a `key` as in
// Redis(`GEOADD key longitude latitude member`), and we consider that all geo data in pegasus is
// under "" key, so when execute 'GEORADIUS' command, the `key` parameter will always be ignored and
// treated as "".
// eg: GEORADIUS "" 146.123 34.567 1000
void redis_parser::geo_radius(message_entry &entry)
{
if (!_geo_client) {
return simple_error_reply(entry, "redis proxy is not on GEO mode");
}
redis_request &redis_request = entry.request;
if (redis_request.sub_requests.size() < 5) {
simple_error_reply(entry, "wrong number of arguments for 'GEORADIUS' command");
return;
}
// longitude latitude
double lng_degrees = 0.0;
const std::string &str_lng_degrees = redis_request.sub_requests[2].data.to_string();
if (!dsn::buf2double(str_lng_degrees, lng_degrees)) {
dwarn_f("longitude parameter '{}' is error, use {}", str_lng_degrees, lng_degrees);
}
double lat_degrees = 0.0;
const std::string &str_lat_degrees = redis_request.sub_requests[3].data.to_string();
if (!dsn::buf2double(str_lat_degrees, lat_degrees)) {
dwarn_f("latitude parameter '{}' is error, use {}", str_lat_degrees, lat_degrees);
}
// radius m|km|ft|mi [WITHCOORD] [WITHDIST] [COUNT count] [ASC|DESC]
double radius_m = 100.0;
std::string unit;
geo::geo_client::SortType sort_type = geo::geo_client::SortType::random;
int count = -1;
bool WITHCOORD = false;
bool WITHDIST = false;
bool WITHHASH = false;
parse_geo_radius_parameters(redis_request.sub_requests,
4,
radius_m,
unit,
sort_type,
count,
WITHCOORD,
WITHDIST,
WITHHASH);
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto search_callback = [ref_this, this, &entry, unit, WITHCOORD, WITHDIST, WITHHASH](
int ec, std::list<geo::SearchResult> &&results) {
process_geo_radius_result(
entry, unit, WITHCOORD, WITHDIST, WITHHASH, ec, std::move(results));
};
_geo_client->async_search_radial(
lat_degrees, lng_degrees, radius_m, count, sort_type, 2000, search_callback);
}
// command format:
// GEORADIUSBYMEMBER key member radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count]
// [ASC|DESC] [STORE key] [STOREDIST key]
// NOTE: [STORE key] [STOREDIST key] are not supported
// NOTE: [WITHHASH] will return origin value of member when enabled
// NOTE: we use SET instead of GEOADD to insert data into pegasus, so there is not a `key` as in
// Redis(`GEOADD key longitude latitude member`), and we consider that all geo data in pegasus is
// under "" key, so when execute 'GEORADIUSBYMEMBER' command, the `key` parameter will always be
// ignored and
// treated as "", and the `member` parameter is treated as `key` which is inserted by SET command.
// eg: GEORADIUSBYMEMBER "" some_key 1000
void redis_parser::geo_radius_by_member(message_entry &entry)
{
if (!_geo_client) {
return simple_error_reply(entry, "redis proxy is not on GEO mode");
}
redis_request &redis_request = entry.request;
if (redis_request.sub_requests.size() < 4) {
simple_error_reply(entry, "wrong number of arguments for 'GEORADIUSBYMEMBER' command");
return;
}
// member
std::string hash_key = redis_request.sub_requests[2].data.to_string(); // member => hash_key
// radius m|km|ft|mi [WITHCOORD] [WITHDIST] [COUNT count] [ASC|DESC] [WITHHASH]
double radius_m = 100.0;
std::string unit;
geo::geo_client::SortType sort_type = geo::geo_client::SortType::random;
int count = -1;
bool WITHCOORD = false;
bool WITHDIST = false;
bool WITHHASH = false;
parse_geo_radius_parameters(redis_request.sub_requests,
3,
radius_m,
unit,
sort_type,
count,
WITHCOORD,
WITHDIST,
WITHHASH);
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto search_callback = [ref_this, this, &entry, unit, WITHCOORD, WITHDIST, WITHHASH](
int ec, std::list<geo::SearchResult> &&results) {
process_geo_radius_result(
entry, unit, WITHCOORD, WITHDIST, WITHHASH, ec, std::move(results));
};
_geo_client->async_search_radial(
hash_key, "", radius_m, count, sort_type, 2000, search_callback);
}
void redis_parser::incr(message_entry &entry) { counter_internal(entry); }
void redis_parser::incr_by(message_entry &entry) { counter_internal(entry); }
void redis_parser::decr(message_entry &entry) { counter_internal(entry); }
void redis_parser::decr_by(message_entry &entry) { counter_internal(entry); }
void redis_parser::counter_internal(message_entry &entry)
{
dassert(!entry.request.sub_requests.empty(), "");
dassert(entry.request.sub_requests[0].length > 0, "");
const char *command = entry.request.sub_requests[0].data.data();
int64_t increment = 1;
if (strcasecmp(command, "INCR") == 0 || strcasecmp(command, "DECR") == 0) {
if (entry.request.sub_requests.size() != 2) {
dwarn_f("{}: command {} seqid({}) with invalid arguments count: {}",
_remote_address.to_string(),
command,
entry.sequence_id,
entry.request.sub_requests.size());
simple_error_reply(entry, fmt::format("wrong number of arguments for '{}'", command));
return;
}
} else if (strcasecmp(command, "INCRBY") == 0 || strcasecmp(command, "DECRBY") == 0) {
if (entry.request.sub_requests.size() != 3) {
dwarn_f("{}: command {} seqid({}) with invalid arguments count: {}",
_remote_address.to_string(),
command,
entry.sequence_id,
entry.request.sub_requests.size());
simple_error_reply(entry, fmt::format("wrong number of arguments for '{}'", command));
return;
}
if (!dsn::buf2int64(entry.request.sub_requests[2].data, increment)) {
dwarn_f("{}: command {} seqid({}) with invalid 'increment': {}",
_remote_address.to_string(),
command,
entry.sequence_id,
entry.request.sub_requests[2].data.to_string());
simple_error_reply(entry,
fmt::format("wrong type of argument 'increment 'for '{}'", command));
return;
}
} else {
dfatal_f("command not support: {}", command);
}
if (strncasecmp(command, "DECR", 4) == 0) {
increment = -increment;
}
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto on_incr_reply = [ref_this, this, command, &entry](
::dsn::error_code ec, dsn::message_ex *, dsn::message_ex *response) {
if (_is_session_reset.load(std::memory_order_acquire)) {
dwarn_f("{}: command {} seqid({}) got reply, but session has reset",
_remote_address.to_string(),
command,
entry.sequence_id);
return;
}
if (::dsn::ERR_OK != ec) {
dwarn_f("{}: command {} seqid({}) got reply with error = {}",
_remote_address.to_string(),
command,
entry.sequence_id,
ec.to_string());
simple_error_reply(entry, ec.to_string());
} else {
::dsn::apps::incr_response incr_resp;
::dsn::unmarshall(response, incr_resp);
if (incr_resp.error != 0) {
simple_error_reply(entry, "internal error " + std::to_string(incr_resp.error));
} else {
simple_integer_reply(entry, incr_resp.new_value);
}
}
};
dsn::apps::incr_request req;
pegasus_generate_key(req.key, entry.request.sub_requests[1].data, dsn::blob());
req.increment = increment;
client->incr(req, on_incr_reply, std::chrono::milliseconds(2000), 0, pegasus_key_hash(req.key));
}
void redis_parser::parse_set_parameters(const std::vector<redis_bulk_string> &opts,
int &ttl_seconds)
{
// [EX seconds]
ttl_seconds = 0;
for (int i = 3; i < opts.size(); ++i) {
const std::string &opt = opts[i].data.to_string();
if (strcasecmp(opt.c_str(), "EX") == 0 && i + 1 < opts.size()) {
const std::string &str_ttl_seconds = opts[i + 1].data.to_string();
if (!dsn::buf2int32(str_ttl_seconds, ttl_seconds)) {
dwarn_f("'EX {}' option is error, use {}", str_ttl_seconds, ttl_seconds);
}
} else {
dwarn_f("only 'EX' option is supported");
}
}
}
void redis_parser::parse_geo_radius_parameters(const std::vector<redis_bulk_string> &opts,
int base_index,
double &radius_m,
std::string &unit,
geo::geo_client::SortType &sort_type,
int &count,
bool &WITHCOORD,
bool &WITHDIST,
bool &WITHHASH)
{
// radius
if (base_index >= opts.size()) {
return;
}
const std::string &str_radius = opts[base_index++].data.to_string();
if (!dsn::buf2double(str_radius, radius_m)) {
dwarn_f("radius parameter '{}' is error, use {}", str_radius, radius_m);
}
// m|km|ft|mi
if (base_index >= opts.size()) {
return;
}
unit = opts[base_index++].data.to_string();
if (unit == "km") {
radius_m *= 1000;
} else if (unit == "mi") {
radius_m *= 1609.344;
} else if (unit == "ft") {
radius_m *= 0.3048;
} else {
// keep as meter unit
unit = "m";
base_index--;
}
// [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC]
while (base_index < opts.size()) {
const std::string &opt = opts[base_index].data.to_string();
if (strcasecmp(opt.c_str(), "WITHCOORD") == 0) {
WITHCOORD = true;
} else if (strcasecmp(opt.c_str(), "WITHDIST") == 0) {
WITHDIST = true;
} else if (strcasecmp(opt.c_str(), "WITHHASH") == 0) {
WITHHASH = true;
} else if (strcasecmp(opt.c_str(), "COUNT") == 0 && base_index + 1 < opts.size()) {
const std::string &str_count = opts[base_index + 1].data.to_string();
if (!dsn::buf2int32(str_count, count)) {
derror_f("'COUNT {}' option is error, use {}", str_count, count);
}
} else if (strcasecmp(opt.c_str(), "ASC") == 0) {
sort_type = geo::geo_client::SortType::asc;
} else if (strcasecmp(opt.c_str(), "DESC") == 0) {
sort_type = geo::geo_client::SortType::desc;
}
base_index++;
}
}
void redis_parser::process_geo_radius_result(message_entry &entry,
const std::string &unit,
bool WITHCOORD,
bool WITHDIST,
bool WITHHASH,
int ec,
std::list<geo::SearchResult> &&results)
{
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: setex command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (PERR_OK != ec) {
simple_error_reply(entry, _geo_client->get_error_string(ec));
} else {
redis_array result;
result.resize(results.size());
size_t i = 0;
for (const auto &elem : results) {
std::shared_ptr<redis_base_type> key =
std::make_shared<redis_bulk_string>(elem.hash_key); // hash_key => member
if (!WITHCOORD && !WITHDIST && !WITHHASH) {
// only member
result.array[i++] = key;
} else {
// member and some WITH* parameters
std::shared_ptr<redis_array> sub_array = std::make_shared<redis_array>();
// member
sub_array->resize(1 + (WITHDIST ? 1 : 0) + (WITHCOORD ? 1 : 0) +
(WITHHASH ? 1 : 0));
size_t index = 0;
sub_array->array[index++] = key;
// NOTE: the order of WITH* parameters should not be changed for the redis
// protocol
if (WITHDIST) {
// with distance
double distance = elem.distance;
if (unit == "km") {
distance /= 1000;
} else if (unit == "mi") {
distance /= 1609.344;
} else if (unit == "ft") {
distance /= 0.3048;
} else {
// keep as meter unit
}
sub_array->array[index++] = construct_bulk_string(distance);
}
if (WITHCOORD) {
// with coordinate
std::shared_ptr<redis_array> coordinate = std::make_shared<redis_array>();
coordinate->resize(2);
coordinate->array[0] = construct_bulk_string(elem.lng_degrees);
coordinate->array[1] = construct_bulk_string(elem.lat_degrees);
sub_array->array[index++] = coordinate;
}
if (WITHHASH) {
// with origin value
sub_array->array[index++] = std::make_shared<redis_bulk_string>(elem.value);
}
result.array[i++] = sub_array;
}
}
reply_message(entry, result);
}
}
// command format:
// GEOADD key longitude latitude member [longitude latitude member ...]
void redis_parser::geo_add(message_entry &entry)
{
if (!_geo_client) {
return simple_error_reply(entry, "redis proxy is not on GEO mode");
}
redis_request &redis_request = entry.request;
if (redis_request.sub_requests.size() < 5 || (redis_request.sub_requests.size() - 2) % 3 != 0) {
simple_error_reply(entry, "wrong number of arguments for 'geoadd' command");
return;
}
int member_count = (redis_request.sub_requests.size() - 2) / 3;
std::shared_ptr<proxy_session> ref_this = shared_from_this();
std::shared_ptr<std::atomic<int32_t>> set_count =
std::make_shared<std::atomic<int32_t>>(member_count);
std::shared_ptr<redis_integer> result(new redis_integer());
auto set_latlng_callback = [ref_this, this, &entry, result, set_count](
int error_code, pegasus_client::internal_info &&info) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: GEOADD command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (PERR_OK != error_code) {
if (set_count->fetch_sub(1) == 1) {
reply_message(entry, *result);
}
return;
}
++(result->value);
if (set_count->fetch_sub(1) == 1) {
reply_message(entry, *result);
}
};
for (int i = 0; i < member_count; ++i) {
dsn::string_view lng_degree_str(redis_request.sub_requests[2 + i * 3].data);
dsn::string_view lat_degree_str(redis_request.sub_requests[2 + i * 3 + 1].data);
double lng_degree;
double lat_degree;
if (dsn::buf2double(lng_degree_str, lng_degree) &&
dsn::buf2double(lat_degree_str, lat_degree)) {
const std::string &hashkey = redis_request.sub_requests[2 + i * 3 + 2].data.to_string();
_geo_client->async_set(hashkey, "", lat_degree, lng_degree, set_latlng_callback, 2000);
} else if (set_count->fetch_sub(1) == 1) {
reply_message(entry, *result);
}
}
}
// command format:
// GEODIST key member1 member2 [unit]
void redis_parser::geo_dist(message_entry &entry)
{
if (!_geo_client) {
return simple_error_reply(entry, "redis proxy is not on GEO mode");
}
redis_request &redis_request = entry.request;
if (redis_request.sub_requests.size() < 4) {
simple_error_reply(entry, "wrong number of arguments for 'geodist' command");
} else {
// TODO: set the timeout
std::string hash_key1 =
redis_request.sub_requests[2].data.to_string(); // member1 => hash_key1
std::string hash_key2 =
redis_request.sub_requests[3].data.to_string(); // member2 => hash_key2
std::string unit = redis_request.sub_requests[4].data.to_string();
std::shared_ptr<proxy_session> ref_this = shared_from_this();
auto get_callback = [ref_this, this, &entry, unit](int error_code, double &&distance) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: GEODIST command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (PERR_OK != error_code) {
simple_error_reply(entry, _geo_client->get_error_string(error_code));
} else {
if (unit == "km") {
distance /= 1000;
} else if (unit == "mi") {
distance /= 1609.344;
} else if (unit == "ft") {
distance /= 0.3048;
} else {
// keep as meter unit
}
reply_message(entry, redis_bulk_string(std::to_string(distance)));
}
};
_geo_client->async_distance(hash_key1, "", hash_key2, "", 2000, get_callback);
}
}
// command format:
// GEOPOS key member [member ...]
void redis_parser::geo_pos(message_entry &entry)
{
if (!_geo_client) {
return simple_error_reply(entry, "redis proxy is not on GEO mode");
}
redis_request &redis_request = entry.request;
if (redis_request.sub_requests.size() < 3) {
simple_error_reply(entry, "wrong number of arguments for 'geopos' command");
return;
}
int member_count = redis_request.sub_requests.size() - 2;
std::shared_ptr<proxy_session> ref_this = shared_from_this();
std::shared_ptr<std::atomic<int32_t>> get_count =
std::make_shared<std::atomic<int32_t>>(member_count);
std::shared_ptr<redis_array> result(new redis_array());
result->resize(member_count);
auto get_latlng_callback = [ref_this, this, &entry, result, get_count](
int error_code, int index, double lat_degrees, double lng_degrees) {
if (_is_session_reset.load(std::memory_order_acquire)) {
ddebug("%s: GEOPOS command seqid(%" PRId64 ") got reply, but session has reset",
_remote_address.to_string(),
entry.sequence_id);
return;
}
if (PERR_OK != error_code) {
// null bulk string for this member
result->array[index] = std::make_shared<redis_bulk_string>();
if (get_count->fetch_sub(1) == 1) {
reply_message(entry, *result);
}
return;
}
std::shared_ptr<redis_array> coordinate(new redis_array());
coordinate->resize(2);
coordinate->array[0] = construct_bulk_string(lng_degrees);
coordinate->array[1] = construct_bulk_string(lat_degrees);
result->array[index] = coordinate;
if (get_count->fetch_sub(1) == 1) {
reply_message(entry, *result);
}
};
for (int i = 0; i < member_count; ++i) {
_geo_client->async_get(
redis_request.sub_requests[i + 2].data.to_string(), "", i, get_latlng_callback, 2000);
}
}
void redis_parser::handle_command(std::unique_ptr<message_entry> &&entry)
{
message_entry &e = *entry.get();
redis_request &request = e.request;
e.sequence_id = ++s_next_seqid;
e.response.store(nullptr, std::memory_order_relaxed);
dinfo("%s: new command parsed with new seqid %" PRId64 "",
_remote_address.to_string(),
e.sequence_id);
enqueue_pending_response(std::move(entry));
dassert(request.sub_request_count > 0,
"invalid request, request.length = %d",
request.sub_request_count);
::dsn::blob &command = request.sub_requests[0].data;
redis_call_handler handler = redis_parser::get_handler(command.data(), command.length());
handler(this, e);
}
void redis_parser::redis_integer::marshalling(::dsn::binary_writer &write_stream) const
{
write_stream.write_pod(':');
std::string result = std::to_string(value);
write_stream.write(result.c_str(), (int)result.length());
write_stream.write_pod(CR);
write_stream.write_pod(LF);
}
void redis_parser::redis_simple_string::marshalling(::dsn::binary_writer &write_stream) const
{
write_stream.write_pod(is_error ? '-' : '+');
write_stream.write(message.c_str(), (int)message.length());
write_stream.write_pod(CR);
write_stream.write_pod(LF);
}
void redis_parser::redis_bulk_string::marshalling(::dsn::binary_writer &write_stream) const
{
dassert_f((-1 == length && data.length() == 0) || data.length() == length,
"{} VS {}",
data.length(),
length);
write_stream.write_pod('$');
std::string length_str = std::to_string(length);
write_stream.write(length_str.c_str(), (int)length_str.length());
write_stream.write_pod(CR);
write_stream.write_pod(LF);
if (length >= 0) {
write_stream.write(data.data(), length);
write_stream.write_pod(CR);
write_stream.write_pod(LF);
}
}
void redis_parser::redis_array::marshalling(::dsn::binary_writer &write_stream) const
{
dassert_f((-1 == count && array.size() == 0) || array.size() == count,
"{} VS {}",
array.size(),
count);
write_stream.write_pod('*');
std::string count_str = std::to_string(count);
write_stream.write(count_str.c_str(), (int)count_str.length());
write_stream.write_pod(CR);
write_stream.write_pod(LF);
for (const auto &elem : array) {
elem->marshalling(write_stream);
}
}
} // namespace proxy
} // namespace pegasus