blob: 3047159cca1e3452fafb7e70afbc5dae9c5df0fa [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <atomic>
#include <chrono>
#include <memory>
#include <sstream>
#include <string>
#include <unordered_map>
#include <vector>
#include "bthread/bthread.h"
#include <gtest/gtest.h>
#include "bthread/countdown_event.h"
#include "butil/synchronization/lock.h"
#include "brpc/channel.h"
#include "brpc/redis.h"
#include "brpc/redis_cluster.h"
#include "brpc/server.h"
namespace {
const int kSplitSlot = 8191;
uint16_t HashSlot(const std::string& key) {
// Keep this aligned with implementation in redis_cluster.cpp.
static const uint16_t table[256] = {
0x0000,0x1021,0x2042,0x3063,0x4084,0x50A5,0x60C6,0x70E7,
0x8108,0x9129,0xA14A,0xB16B,0xC18C,0xD1AD,0xE1CE,0xF1EF,
0x1231,0x0210,0x3273,0x2252,0x52B5,0x4294,0x72F7,0x62D6,
0x9339,0x8318,0xB37B,0xA35A,0xD3BD,0xC39C,0xF3FF,0xE3DE,
0x2462,0x3443,0x0420,0x1401,0x64E6,0x74C7,0x44A4,0x5485,
0xA56A,0xB54B,0x8528,0x9509,0xE5EE,0xF5CF,0xC5AC,0xD58D,
0x3653,0x2672,0x1611,0x0630,0x76D7,0x66F6,0x5695,0x46B4,
0xB75B,0xA77A,0x9719,0x8738,0xF7DF,0xE7FE,0xD79D,0xC7BC,
0x48C4,0x58E5,0x6886,0x78A7,0x0840,0x1861,0x2802,0x3823,
0xC9CC,0xD9ED,0xE98E,0xF9AF,0x8948,0x9969,0xA90A,0xB92B,
0x5AF5,0x4AD4,0x7AB7,0x6A96,0x1A71,0x0A50,0x3A33,0x2A12,
0xDBFD,0xCBDC,0xFBBF,0xEB9E,0x9B79,0x8B58,0xBB3B,0xAB1A,
0x6CA6,0x7C87,0x4CE4,0x5CC5,0x2C22,0x3C03,0x0C60,0x1C41,
0xEDAE,0xFD8F,0xCDEC,0xDDCD,0xAD2A,0xBD0B,0x8D68,0x9D49,
0x7E97,0x6EB6,0x5ED5,0x4EF4,0x3E13,0x2E32,0x1E51,0x0E70,
0xFF9F,0xEFBE,0xDFDD,0xCFFC,0xBF1B,0xAF3A,0x9F59,0x8F78,
0x9188,0x81A9,0xB1CA,0xA1EB,0xD10C,0xC12D,0xF14E,0xE16F,
0x1080,0x00A1,0x30C2,0x20E3,0x5004,0x4025,0x7046,0x6067,
0x83B9,0x9398,0xA3FB,0xB3DA,0xC33D,0xD31C,0xE37F,0xF35E,
0x02B1,0x1290,0x22F3,0x32D2,0x4235,0x5214,0x6277,0x7256,
0xB5EA,0xA5CB,0x95A8,0x8589,0xF56E,0xE54F,0xD52C,0xC50D,
0x34E2,0x24C3,0x14A0,0x0481,0x7466,0x6447,0x5424,0x4405,
0xA7DB,0xB7FA,0x8799,0x97B8,0xE75F,0xF77E,0xC71D,0xD73C,
0x26D3,0x36F2,0x0691,0x16B0,0x6657,0x7676,0x4615,0x5634,
0xD94C,0xC96D,0xF90E,0xE92F,0x99C8,0x89E9,0xB98A,0xA9AB,
0x5844,0x4865,0x7806,0x6827,0x18C0,0x08E1,0x3882,0x28A3,
0xCB7D,0xDB5C,0xEB3F,0xFB1E,0x8BF9,0x9BD8,0xABBB,0xBB9A,
0x4A75,0x5A54,0x6A37,0x7A16,0x0AF1,0x1AD0,0x2AB3,0x3A92,
0xFD2E,0xED0F,0xDD6C,0xCD4D,0xBDAA,0xAD8B,0x9DE8,0x8DC9,
0x7C26,0x6C07,0x5C64,0x4C45,0x3CA2,0x2C83,0x1CE0,0x0CC1,
0xEF1F,0xFF3E,0xCF5D,0xDF7C,0xAF9B,0xBFBA,0x8FD9,0x9FF8,
0x6E17,0x7E36,0x4E55,0x5E74,0x2E93,0x3EB2,0x0ED1,0x1EF0
};
std::string hashed = key;
size_t begin = key.find('{');
if (begin != std::string::npos) {
size_t end = key.find('}', begin + 1);
if (end != std::string::npos && end > begin + 1) {
hashed = key.substr(begin + 1, end - begin - 1);
}
}
uint16_t crc = 0;
for (size_t i = 0; i < hashed.size(); ++i) {
uint8_t idx = static_cast<uint8_t>((crc >> 8) ^
static_cast<uint8_t>(hashed[i]));
crc = static_cast<uint16_t>((crc << 8) ^ table[idx]);
}
return crc & 16383;
}
int OwnerBySlot(int slot) {
return slot <= kSplitSlot ? 0 : 1;
}
struct ClusterMeta {
std::string endpoint[2];
bool fail_slots;
bool fail_nodes;
bool slots_empty_host;
std::atomic<int> slots_override_slot;
std::atomic<int> slots_override_owner;
std::atomic<bool> accept_requests_on_wrong_owner;
std::unordered_map<std::string, int> owner_override;
std::unordered_map<std::string, std::string> forced_error_by_key;
std::atomic<int> slots_calls;
std::atomic<int> nodes_calls;
std::atomic<int> moved_error_calls;
std::atomic<int> ask_error_calls;
std::string custom_nodes_payload;
bool enable_ask;
std::string ask_key;
int ask_from;
int ask_to;
std::string redirect_loop_key;
ClusterMeta()
: fail_slots(false)
, fail_nodes(false)
, slots_empty_host(false)
, slots_override_slot(-1)
, slots_override_owner(-1)
, accept_requests_on_wrong_owner(false)
, slots_calls(0)
, nodes_calls(0)
, moved_error_calls(0)
, ask_error_calls(0)
, enable_ask(false)
, ask_from(0)
, ask_to(1) {
}
int OwnerOfKey(const std::string& key) const {
std::unordered_map<std::string, int>::const_iterator it = owner_override.find(key);
if (it != owner_override.end()) {
return it->second;
}
return OwnerBySlot(HashSlot(key));
}
};
struct NodeData {
int node_id;
ClusterMeta* meta;
butil::Mutex mutex;
std::unordered_map<std::string, std::string> kv;
};
class Session : public brpc::Destroyable {
public:
Session() : asking(false) {}
void Destroy() override { delete this; }
bool asking;
};
static Session* GetOrCreateSession(brpc::RedisConnContext* ctx) {
if (ctx == NULL) {
return NULL;
}
Session* s = static_cast<Session*>(ctx->get_session());
if (s == NULL) {
s = new Session;
ctx->reset_session(s);
}
return s;
}
static bool ParseEndpoint(const std::string& endpoint, std::string* host, int* port) {
size_t pos = endpoint.rfind(':');
if (pos == std::string::npos) {
return false;
}
*host = endpoint.substr(0, pos);
*port = atoi(endpoint.substr(pos + 1).c_str());
return (*port > 0);
}
class AskingHandler : public brpc::RedisCommandHandler {
public:
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* ctx,
const std::vector<butil::StringPiece>& /*args*/,
brpc::RedisReply* output,
bool /*flush_batched*/) override {
Session* s = GetOrCreateSession(ctx);
if (s != NULL) {
s->asking = true;
}
output->SetStatus("OK");
return brpc::REDIS_CMD_HANDLED;
}
};
class ClusterCommandHandler : public brpc::RedisCommandHandler {
public:
explicit ClusterCommandHandler(ClusterMeta* meta) : _meta(meta) {}
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* /*ctx*/,
const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool /*flush_batched*/) override {
if (args.size() < 2) {
output->SetError("ERR wrong number of arguments for 'cluster' command");
return brpc::REDIS_CMD_HANDLED;
}
if (args[1] == "slots") {
_meta->slots_calls.fetch_add(1, std::memory_order_relaxed);
if (_meta->fail_slots) {
output->SetError("ERR cluster slots disabled for test");
return brpc::REDIS_CMD_HANDLED;
}
const int override_slot = _meta->slots_override_slot.load(std::memory_order_relaxed);
const int override_owner = _meta->slots_override_owner.load(std::memory_order_relaxed);
const int default_owner = (override_slot >= 0 && override_slot <= 16383)
? OwnerBySlot(override_slot)
: -1;
if (default_owner != -1 &&
(override_owner == 0 || override_owner == 1) &&
override_owner != default_owner) {
struct SlotRange {
int start;
int end;
int owner;
SlotRange(int s, int e, int o) : start(s), end(e), owner(o) {}
};
std::vector<SlotRange> ranges;
if (override_slot <= kSplitSlot) {
if (override_slot > 0) {
ranges.push_back(SlotRange(0, override_slot - 1, 0));
}
ranges.push_back(SlotRange(override_slot, override_slot, override_owner));
if (override_slot < kSplitSlot) {
ranges.push_back(SlotRange(override_slot + 1, kSplitSlot, 0));
}
ranges.push_back(SlotRange(kSplitSlot + 1, 16383, 1));
} else {
ranges.push_back(SlotRange(0, kSplitSlot, 0));
if (override_slot > kSplitSlot + 1) {
ranges.push_back(SlotRange(kSplitSlot + 1, override_slot - 1, 1));
}
ranges.push_back(SlotRange(override_slot, override_slot, override_owner));
if (override_slot < 16383) {
ranges.push_back(SlotRange(override_slot + 1, 16383, 1));
}
}
output->SetArray(ranges.size());
for (size_t i = 0; i < ranges.size(); ++i) {
FillSlotEntry((*output)[i], ranges[i].start, ranges[i].end,
_meta->endpoint[ranges[i].owner], _meta->slots_empty_host);
}
return brpc::REDIS_CMD_HANDLED;
}
output->SetArray(2);
FillSlotEntry((*output)[0], 0, kSplitSlot, _meta->endpoint[0],
_meta->slots_empty_host);
FillSlotEntry((*output)[1], kSplitSlot + 1, 16383, _meta->endpoint[1],
_meta->slots_empty_host);
return brpc::REDIS_CMD_HANDLED;
}
if (args[1] == "nodes") {
_meta->nodes_calls.fetch_add(1, std::memory_order_relaxed);
if (_meta->fail_nodes) {
output->SetError("ERR cluster nodes disabled for test");
return brpc::REDIS_CMD_HANDLED;
}
if (!_meta->custom_nodes_payload.empty()) {
output->SetString(_meta->custom_nodes_payload);
return brpc::REDIS_CMD_HANDLED;
}
std::ostringstream oss;
oss << "node0 " << _meta->endpoint[0] << "@17000 master - 0 0 1 connected 0-"
<< kSplitSlot << "\n";
oss << "node1 " << _meta->endpoint[1] << "@17001 master - 0 0 1 connected "
<< (kSplitSlot + 1) << "-16383\n";
output->SetString(oss.str());
return brpc::REDIS_CMD_HANDLED;
}
output->SetError("ERR unsupported CLUSTER subcommand");
return brpc::REDIS_CMD_HANDLED;
}
private:
static void FillSlotEntry(brpc::RedisReply& reply, int start, int end,
const std::string& endpoint,
bool empty_host) {
std::string host;
int port = 0;
ParseEndpoint(endpoint, &host, &port);
reply.SetArray(3);
reply[0].SetInteger(start);
reply[1].SetInteger(end);
reply[2].SetArray(2);
if (empty_host) {
reply[2][0].SetString("");
} else {
reply[2][0].SetString(host);
}
reply[2][1].SetInteger(port);
}
private:
ClusterMeta* _meta;
};
class KVCommandHandler : public brpc::RedisCommandHandler {
public:
explicit KVCommandHandler(NodeData* data) : _data(data) {}
brpc::RedisCommandHandlerResult Run(brpc::RedisConnContext* ctx,
const std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
bool /*flush_batched*/) override {
if (args.empty()) {
output->SetError("ERR empty command");
return brpc::REDIS_CMD_HANDLED;
}
const std::string command = args[0].as_string();
if (command == "ping") {
output->SetStatus("PONG");
return brpc::REDIS_CMD_HANDLED;
}
if (command == "eval" || command == "evalsha") {
output->SetStatus("OK");
return brpc::REDIS_CMD_HANDLED;
}
if (args.size() < 2) {
output->SetError("ERR wrong number of arguments");
return brpc::REDIS_CMD_HANDLED;
}
const std::string key = args[1].as_string();
const int slot = HashSlot(key);
const int owner = _data->meta->OwnerOfKey(key);
std::unordered_map<std::string, std::string>::const_iterator forced =
_data->meta->forced_error_by_key.find(key);
if (forced != _data->meta->forced_error_by_key.end()) {
output->SetError(forced->second);
return brpc::REDIS_CMD_HANDLED;
}
if (!_data->meta->redirect_loop_key.empty() &&
key == _data->meta->redirect_loop_key) {
const int target = 1 - _data->node_id;
_data->meta->moved_error_calls.fetch_add(1, std::memory_order_relaxed);
output->FormatError("MOVED %d %s", slot,
_data->meta->endpoint[target].c_str());
return brpc::REDIS_CMD_HANDLED;
}
bool bypass_owner_check = false;
if (_data->meta->enable_ask && key == _data->meta->ask_key) {
if (_data->node_id == _data->meta->ask_from) {
_data->meta->ask_error_calls.fetch_add(1, std::memory_order_relaxed);
output->FormatError("ASK %d %s", slot,
_data->meta->endpoint[_data->meta->ask_to].c_str());
return brpc::REDIS_CMD_HANDLED;
}
if (_data->node_id == _data->meta->ask_to) {
Session* s = GetOrCreateSession(ctx);
if (s == NULL || !s->asking) {
output->SetError("ERR ASKING required");
return brpc::REDIS_CMD_HANDLED;
}
s->asking = false;
bypass_owner_check = true;
}
}
const bool enforce_owner =
!_data->meta->accept_requests_on_wrong_owner.load(std::memory_order_relaxed);
if (!bypass_owner_check && enforce_owner && owner != _data->node_id) {
_data->meta->moved_error_calls.fetch_add(1, std::memory_order_relaxed);
output->FormatError("MOVED %d %s", slot, _data->meta->endpoint[owner].c_str());
return brpc::REDIS_CMD_HANDLED;
}
if (command == "set") {
if (args.size() < 3) {
output->SetError("ERR wrong number of arguments for 'set' command");
return brpc::REDIS_CMD_HANDLED;
}
BAIDU_SCOPED_LOCK(_data->mutex);
_data->kv[key] = args[2].as_string();
output->SetStatus("OK");
return brpc::REDIS_CMD_HANDLED;
}
if (command == "get") {
BAIDU_SCOPED_LOCK(_data->mutex);
std::unordered_map<std::string, std::string>::iterator it = _data->kv.find(key);
if (it == _data->kv.end()) {
output->SetNullString();
} else {
output->SetString(it->second);
}
return brpc::REDIS_CMD_HANDLED;
}
if (command == "del" || command == "unlink") {
BAIDU_SCOPED_LOCK(_data->mutex);
output->SetInteger(_data->kv.erase(key) ? 1 : 0);
return brpc::REDIS_CMD_HANDLED;
}
if (command == "exists") {
BAIDU_SCOPED_LOCK(_data->mutex);
output->SetInteger(_data->kv.count(key) ? 1 : 0);
return brpc::REDIS_CMD_HANDLED;
}
output->SetError("ERR unsupported command");
return brpc::REDIS_CMD_HANDLED;
}
private:
NodeData* _data;
};
class ClusterRedisService : public brpc::RedisService {
public:
explicit ClusterRedisService(NodeData* data) {
AddCommandHandler("asking", new AskingHandler());
AddCommandHandler("cluster", new ClusterCommandHandler(data->meta));
KVCommandHandler* handler = new KVCommandHandler(data);
AddCommandHandler("ping", handler);
AddCommandHandler("get", handler);
AddCommandHandler("set", handler);
AddCommandHandler("del", handler);
AddCommandHandler("exists", handler);
AddCommandHandler("unlink", handler);
AddCommandHandler("eval", handler);
AddCommandHandler("evalsha", handler);
}
};
class Done : public google::protobuf::Closure {
public:
explicit Done(bthread::CountdownEvent* e) : _event(e) {}
void Run() override { _event->signal(); }
private:
bthread::CountdownEvent* _event;
};
class RedisClusterChannelTest : public testing::Test {
protected:
void SetUp() override {
_meta.reset(new ClusterMeta);
_node[0].meta = _meta.get();
_node[0].node_id = 0;
_node[1].meta = _meta.get();
_node[1].node_id = 1;
StartServer(0);
StartServer(1);
}
void TearDown() override {
for (int i = 0; i < 2; ++i) {
_server[i].Stop(0);
}
for (int i = 0; i < 2; ++i) {
_server[i].Join();
}
}
std::string SeedList() const {
return _meta->endpoint[0] + "," + _meta->endpoint[1];
}
void InitChannel(brpc::RedisClusterChannel* channel, int max_redirect = 5) {
brpc::RedisClusterChannelOptions options;
options.max_redirect = max_redirect;
options.enable_periodic_refresh = false;
ASSERT_EQ(0, channel->Init(SeedList(), &options));
}
std::string FindKeyForNode(int node_id) const {
for (int i = 0; i < 200000; ++i) {
std::ostringstream oss;
oss << "key_" << node_id << '_' << i;
if (OwnerBySlot(HashSlot(oss.str())) == node_id) {
return oss.str();
}
}
return "fallback_key";
}
std::vector<std::string> FindKeysForNode(int node_id, size_t count) const {
std::vector<std::string> keys;
keys.reserve(count);
for (int i = 0; i < 400000 && keys.size() < count; ++i) {
std::ostringstream oss;
oss << "key_batch_" << node_id << '_' << i;
if (OwnerBySlot(HashSlot(oss.str())) == node_id) {
keys.push_back(oss.str());
}
}
return keys;
}
std::string FindHashTagForNode(int node_id) const {
for (int i = 0; i < 200000; ++i) {
std::ostringstream oss;
oss << "tag_" << node_id << '_' << i;
const std::string key = "{" + oss.str() + "}";
if (OwnerBySlot(HashSlot(key)) == node_id) {
return oss.str();
}
}
return "fallback_tag";
}
private:
void StartServer(int index) {
brpc::ServerOptions options;
options.redis_service = new ClusterRedisService(&_node[index]);
brpc::PortRange range(20000 + index * 1000, 20999 + index * 1000);
ASSERT_EQ(0, _server[index].Start("127.0.0.1", range, &options));
std::ostringstream oss;
oss << "127.0.0.1:" << _server[index].listen_address().port;
_meta->endpoint[index] = oss.str();
}
protected:
std::unique_ptr<ClusterMeta> _meta;
NodeData _node[2];
brpc::Server _server[2];
};
TEST_F(RedisClusterChannelTest, basic_routing_and_multi_key_commands) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key0 = FindKeyForNode(0);
const std::string key1 = FindKeyForNode(1);
{
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("mset %s v0 %s v1", key0.c_str(), key1.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("OK", resp.reply(0).data());
}
{
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("mget %s %s", key0.c_str(), key1.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_array());
ASSERT_EQ(2u, resp.reply(0).size());
ASSERT_EQ("v0", resp.reply(0)[0].data());
ASSERT_EQ("v1", resp.reply(0)[1].data());
}
{
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("exists %s %s", key0.c_str(), key1.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(2, resp.reply(0).integer());
}
{
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("del %s %s", key0.c_str(), key1.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(2, resp.reply(0).integer());
}
}
TEST_F(RedisClusterChannelTest, moved_redirection) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string moved_key = FindKeyForNode(0);
_meta->owner_override[moved_key] = 1;
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[moved_key] = "moved-value";
}
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", moved_key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("moved-value", resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, ask_redirection) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
_meta->enable_ask = true;
_meta->ask_from = 0;
_meta->ask_to = 1;
_meta->ask_key = FindKeyForNode(0);
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[_meta->ask_key] = "ask-value";
}
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", _meta->ask_key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("ask-value", resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, ask_redirection_does_not_override_slot_cache) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
_meta->enable_ask = true;
_meta->ask_from = 0;
_meta->ask_to = 1;
_meta->ask_key = FindKeyForNode(0);
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[_meta->ask_key] = "ask-value";
}
for (int i = 0; i < 5; ++i) {
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", _meta->ask_key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("ask-value", resp.reply(0).data());
}
}
TEST_F(RedisClusterChannelTest, cluster_nodes_fallback) {
_meta->fail_slots = true;
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key = FindKeyForNode(1);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("set %s vv", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("OK", resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, eval_and_evalsha) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key0 = FindKeyForNode(0);
const std::string key1 = FindKeyForNode(1);
{
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
const butil::StringPiece parts[] = {
"eval", "return 1", "2", key0, key1
};
ASSERT_TRUE(req.AddCommandByComponents(parts, sizeof(parts) / sizeof(parts[0])));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_NE(std::string::npos,
std::string(resp.reply(0).error_message()).find("CROSSSLOT"));
}
{
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("evalsha abcdef 1 %s arg1", key0.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("OK", resp.reply(0).data());
}
}
TEST_F(RedisClusterChannelTest, redirect_retry_limit) {
brpc::RedisClusterChannel channel;
InitChannel(&channel, 3);
_meta->redirect_loop_key = FindKeyForNode(0);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", _meta->redirect_loop_key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_TRUE(cntl.Failed());
ASSERT_NE(std::string::npos, cntl.ErrorText().find("redirect"));
}
TEST_F(RedisClusterChannelTest, async_call) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key = FindKeyForNode(1);
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key] = "async-value";
}
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key.c_str()));
bthread::CountdownEvent event(1);
Done done(&event);
channel.CallMethod(NULL, &cntl, &req, &resp, &done);
event.wait();
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("async-value", resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, pipeline_order_with_mixed_commands) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key0 = FindKeyForNode(0);
const std::string key1 = FindKeyForNode(1);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("set %s va", key0.c_str()));
ASSERT_TRUE(req.AddCommand("set %s vb", key1.c_str()));
ASSERT_TRUE(req.AddCommand("mget %s %s", key0.c_str(), key1.c_str()));
ASSERT_TRUE(req.AddCommand("exists %s %s", key0.c_str(), key1.c_str()));
ASSERT_TRUE(req.AddCommand("unlink %s %s", key0.c_str(), key1.c_str()));
ASSERT_TRUE(req.AddCommand("mget %s %s", key0.c_str(), key1.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(6, resp.reply_size());
ASSERT_EQ("OK", resp.reply(0).data());
ASSERT_EQ("OK", resp.reply(1).data());
ASSERT_TRUE(resp.reply(2).is_array());
ASSERT_EQ(2u, resp.reply(2).size());
ASSERT_EQ("va", resp.reply(2)[0].data());
ASSERT_EQ("vb", resp.reply(2)[1].data());
ASSERT_TRUE(resp.reply(3).is_integer());
ASSERT_EQ(2, resp.reply(3).integer());
ASSERT_TRUE(resp.reply(4).is_integer());
ASSERT_EQ(2, resp.reply(4).integer());
ASSERT_TRUE(resp.reply(5).is_array());
ASSERT_EQ(2u, resp.reply(5).size());
ASSERT_TRUE(resp.reply(5)[0].is_nil());
ASSERT_TRUE(resp.reply(5)[1].is_nil());
}
TEST_F(RedisClusterChannelTest, transaction_commands_are_not_supported) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("multi"));
ASSERT_TRUE(req.AddCommand("exec"));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(2, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_TRUE(resp.reply(1).is_error());
ASSERT_NE(std::string::npos,
std::string(resp.reply(0).error_message()).find("not supported"));
}
TEST_F(RedisClusterChannelTest, eval_argument_validation) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
{
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
const butil::StringPiece parts[] = {
"eval", "return 1", "abc", "k1"
};
ASSERT_TRUE(req.AddCommandByComponents(parts, sizeof(parts) / sizeof(parts[0])));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_NE(std::string::npos,
std::string(resp.reply(0).error_message()).find("invalid numkeys"));
}
{
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
const butil::StringPiece parts[] = {
"eval", "return 1", "2", "k1"
};
ASSERT_TRUE(req.AddCommandByComponents(parts, sizeof(parts) / sizeof(parts[0])));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_NE(std::string::npos,
std::string(resp.reply(0).error_message()).find("not enough keys"));
}
}
TEST_F(RedisClusterChannelTest, async_failure_propagation) {
brpc::RedisClusterChannel channel;
InitChannel(&channel, 1);
_meta->redirect_loop_key = FindKeyForNode(0);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", _meta->redirect_loop_key.c_str()));
bthread::CountdownEvent event(1);
Done done(&event);
channel.CallMethod(NULL, &cntl, &req, &resp, &done);
event.wait();
ASSERT_TRUE(cntl.Failed());
ASSERT_NE(std::string::npos, cntl.ErrorText().find("redirect"));
}
TEST_F(RedisClusterChannelTest, max_redirect_zero_fails_on_single_redirect) {
brpc::RedisClusterChannel channel;
InitChannel(&channel, 0);
const std::string key = FindKeyForNode(0);
_meta->owner_override[key] = 1;
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key] = "value-on-node1";
}
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_TRUE(cntl.Failed());
ASSERT_NE(std::string::npos, cntl.ErrorText().find("redirect"));
}
TEST_F(RedisClusterChannelTest, redirect_with_refresh_failure_still_returns_reply) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key = FindKeyForNode(0);
_meta->owner_override[key] = 1;
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key] = "moved-value";
}
const int before_slots = _meta->slots_calls.load(std::memory_order_relaxed);
const int before_nodes = _meta->nodes_calls.load(std::memory_order_relaxed);
_meta->fail_slots = true;
_meta->fail_nodes = true;
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("moved-value", resp.reply(0).data());
ASSERT_GT(_meta->slots_calls.load(std::memory_order_relaxed), before_slots);
ASSERT_GT(_meta->nodes_calls.load(std::memory_order_relaxed), before_nodes);
}
TEST_F(RedisClusterChannelTest, pipeline_with_ask_and_moved_keeps_order) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string ask_key = FindKeyForNode(0);
std::string moved_key;
for (int i = 0; i < 200000; ++i) {
std::ostringstream oss;
oss << "moved_key_" << i;
if (OwnerBySlot(HashSlot(oss.str())) == 0 && oss.str() != ask_key) {
moved_key = oss.str();
break;
}
}
ASSERT_FALSE(moved_key.empty());
_meta->enable_ask = true;
_meta->ask_from = 0;
_meta->ask_to = 1;
_meta->ask_key = ask_key;
_meta->owner_override[moved_key] = 1;
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[ask_key] = "ask-value";
_node[1].kv[moved_key] = "moved-value";
}
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", ask_key.c_str()));
ASSERT_TRUE(req.AddCommand("get %s", moved_key.c_str()));
ASSERT_TRUE(req.AddCommand("get %s", ask_key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(3, resp.reply_size());
ASSERT_EQ("ask-value", resp.reply(0).data());
ASSERT_EQ("moved-value", resp.reply(1).data());
ASSERT_EQ("ask-value", resp.reply(2).data());
}
TEST_F(RedisClusterChannelTest, fallback_to_nodes_then_recover_to_slots) {
_meta->fail_slots = true;
brpc::RedisClusterChannel channel;
InitChannel(&channel);
ASSERT_GT(_meta->nodes_calls.load(std::memory_order_relaxed), 0);
_meta->fail_slots = false;
const int before_slots = _meta->slots_calls.load(std::memory_order_relaxed);
const std::string key = FindKeyForNode(0);
_meta->owner_override[key] = 1;
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key] = "recover-value";
}
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ("recover-value", resp.reply(0).data());
ASSERT_GT(_meta->slots_calls.load(std::memory_order_relaxed), before_slots);
}
TEST_F(RedisClusterChannelTest, cluster_slots_empty_host_uses_seed_host) {
_meta->slots_empty_host = true;
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key = FindKeyForNode(1);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("set %s host-fallback-value", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ("OK", resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, init_accepts_whitespace_in_seed_list) {
brpc::RedisClusterChannel channel;
brpc::RedisClusterChannelOptions options;
options.enable_periodic_refresh = false;
const std::string seeds = " " + _meta->endpoint[0] + " , " + _meta->endpoint[1] + " ";
ASSERT_EQ(0, channel.Init(seeds, &options));
}
TEST_F(RedisClusterChannelTest, init_with_invalid_seed_tokens_should_fail) {
brpc::RedisClusterChannel channel;
ASSERT_NE(0, channel.Init(" , , "));
}
TEST_F(RedisClusterChannelTest, init_fails_when_cluster_topology_unavailable) {
_meta->fail_slots = true;
_meta->fail_nodes = true;
brpc::RedisClusterChannel channel;
brpc::RedisClusterChannelOptions options;
options.enable_periodic_refresh = false;
ASSERT_NE(0, channel.Init(SeedList(), &options));
}
TEST_F(RedisClusterChannelTest, ping_without_key_uses_any_endpoint) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("ping"));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("PONG", resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, wrong_argument_count_commands_return_error_reply) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("mget"));
ASSERT_TRUE(req.AddCommand("mset only_key"));
ASSERT_TRUE(req.AddCommand("del"));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(3, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_TRUE(resp.reply(1).is_error());
ASSERT_TRUE(resp.reply(2).is_error());
}
TEST_F(RedisClusterChannelTest, malformed_redirect_error_is_returned_directly) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key = FindKeyForNode(0);
_meta->forced_error_by_key[key] = "MOVED not_a_slot bad_endpoint";
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_EQ("MOVED not_a_slot bad_endpoint",
std::string(resp.reply(0).error_message()));
}
TEST_F(RedisClusterChannelTest, cluster_nodes_parser_ignores_migration_tokens) {
_meta->fail_slots = true;
std::ostringstream nodes;
nodes << "node0 " << _meta->endpoint[0]
<< "@17000 master - 0 0 1 connected 0-" << kSplitSlot
<< " [100->-node1]\n";
nodes << "node1 " << _meta->endpoint[1]
<< "@17001 master - 0 0 1 connected " << (kSplitSlot + 1)
<< "-16383 [100-<-node0]\n";
_meta->custom_nodes_payload = nodes.str();
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key = FindKeyForNode(1);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("set %s from-nodes", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ("OK", resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, eval_numkeys_zero_routes_without_slot) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
const butil::StringPiece parts[] = {
"eval", "return 'ok'", "0", "arg1"
};
ASSERT_TRUE(req.AddCommandByComponents(parts, sizeof(parts) / sizeof(parts[0])));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("OK", resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, mset_stops_after_subcommand_error) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
std::vector<std::string> keys0 = FindKeysForNode(0, 2);
ASSERT_EQ(2u, keys0.size());
const std::string key_ok = keys0[0];
const std::string key_tail = keys0[1];
const std::string key_err = FindKeyForNode(1);
ASSERT_NE(key_ok, key_err);
ASSERT_NE(key_tail, key_err);
_meta->forced_error_by_key[key_err] = "ERR injected set failure";
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("mset %s v0 %s v1 %s v2",
key_ok.c_str(),
key_err.c_str(),
key_tail.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_NE(std::string::npos,
std::string(resp.reply(0).error_message()).find("injected set failure"));
{
brpc::RedisRequest get_req;
brpc::RedisResponse get_resp;
brpc::Controller get_cntl;
ASSERT_TRUE(get_req.AddCommand("get %s", key_ok.c_str()));
channel.CallMethod(NULL, &get_cntl, &get_req, &get_resp, NULL);
ASSERT_FALSE(get_cntl.Failed()) << get_cntl.ErrorText();
ASSERT_TRUE(get_resp.reply(0).is_string());
ASSERT_EQ("v0", get_resp.reply(0).data());
}
{
brpc::RedisRequest get_req;
brpc::RedisResponse get_resp;
brpc::Controller get_cntl;
ASSERT_TRUE(get_req.AddCommand("get %s", key_tail.c_str()));
channel.CallMethod(NULL, &get_cntl, &get_req, &get_resp, NULL);
ASSERT_FALSE(get_cntl.Failed()) << get_cntl.ErrorText();
ASSERT_TRUE(get_resp.reply(0).is_nil());
}
}
TEST_F(RedisClusterChannelTest, integer_aggregate_stops_after_subcommand_error) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key0 = FindKeyForNode(0);
std::vector<std::string> keys1 = FindKeysForNode(1, 2);
ASSERT_EQ(2u, keys1.size());
const std::string key_err = keys1[0];
const std::string key_tail = keys1[1];
{
BAIDU_SCOPED_LOCK(_node[0].mutex);
_node[0].kv[key0] = "v0";
}
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key_err] = "verr";
_node[1].kv[key_tail] = "vtail";
}
_meta->forced_error_by_key[key_err] = "ERR injected unlink failure";
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("unlink %s %s %s",
key0.c_str(), key_err.c_str(), key_tail.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_NE(std::string::npos,
std::string(resp.reply(0).error_message()).find("injected unlink failure"));
brpc::RedisRequest get_req;
brpc::RedisResponse get_resp;
brpc::Controller get_cntl;
ASSERT_TRUE(get_req.AddCommand("get %s", key_tail.c_str()));
channel.CallMethod(NULL, &get_cntl, &get_req, &get_resp, NULL);
ASSERT_FALSE(get_cntl.Failed()) << get_cntl.ErrorText();
ASSERT_TRUE(get_resp.reply(0).is_string());
ASSERT_EQ("vtail", get_resp.reply(0).data());
}
TEST_F(RedisClusterChannelTest, async_concurrent_calls_with_mixed_redirections) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
std::vector<std::string> keys0 = FindKeysForNode(0, 2);
ASSERT_EQ(2u, keys0.size());
const std::string ask_key = keys0[0];
const std::string moved_key = keys0[1];
const std::string normal_key = FindKeyForNode(1);
_meta->enable_ask = true;
_meta->ask_from = 0;
_meta->ask_to = 1;
_meta->ask_key = ask_key;
_meta->owner_override[moved_key] = 1;
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[ask_key] = "ask-v";
_node[1].kv[moved_key] = "moved-v";
_node[1].kv[normal_key] = "normal-v";
}
const int req_count = 60;
bthread::CountdownEvent event(req_count);
std::vector<std::unique_ptr<brpc::RedisRequest> > requests(req_count);
std::vector<std::unique_ptr<brpc::RedisResponse> > responses(req_count);
std::vector<std::unique_ptr<brpc::Controller> > controllers(req_count);
std::vector<std::unique_ptr<Done> > dones(req_count);
std::vector<std::string> expected(req_count);
for (int i = 0; i < req_count; ++i) {
requests[i].reset(new brpc::RedisRequest);
responses[i].reset(new brpc::RedisResponse);
controllers[i].reset(new brpc::Controller);
dones[i].reset(new Done(&event));
std::string key;
if (i % 3 == 0) {
key = ask_key;
expected[i] = "ask-v";
} else if (i % 3 == 1) {
key = moved_key;
expected[i] = "moved-v";
} else {
key = normal_key;
expected[i] = "normal-v";
}
ASSERT_TRUE(requests[i]->AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL,
controllers[i].get(),
requests[i].get(),
responses[i].get(),
dones[i].get());
}
event.wait();
for (int i = 0; i < req_count; ++i) {
ASSERT_FALSE(controllers[i]->Failed()) << controllers[i]->ErrorText();
ASSERT_EQ(1, responses[i]->reply_size());
ASSERT_TRUE(responses[i]->reply(0).is_string());
ASSERT_EQ(expected[i], responses[i]->reply(0).data());
}
ASSERT_GT(_meta->ask_error_calls.load(std::memory_order_relaxed), 0);
ASSERT_GT(_meta->moved_error_calls.load(std::memory_order_relaxed), 0);
}
TEST_F(RedisClusterChannelTest, hashtag_keys_route_for_multi_key_commands) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string tag = FindHashTagForNode(1);
const std::string key0 = "k0{" + tag + "}suffix";
const std::string key1 = "k1{" + tag + "}suffix";
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("mset %s v0 %s v1", key0.c_str(), key1.c_str()));
ASSERT_TRUE(req.AddCommand("mget %s %s", key0.c_str(), key1.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(2, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("OK", resp.reply(0).data());
ASSERT_TRUE(resp.reply(1).is_array());
ASSERT_EQ("v0", resp.reply(1)[0].data());
ASSERT_EQ("v1", resp.reply(1)[1].data());
ASSERT_EQ(0, _meta->moved_error_calls.load(std::memory_order_relaxed));
}
TEST_F(RedisClusterChannelTest, missing_key_get_returns_nil_reply) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key = FindKeyForNode(0);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_nil());
}
TEST_F(RedisClusterChannelTest, pipeline_with_string_nil_error_and_string) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key_ok = FindKeyForNode(1);
const std::string key_nil = FindKeyForNode(0);
std::string key_err;
for (int i = 0; i < 200000; ++i) {
std::ostringstream oss;
oss << "err_key_" << i;
if (OwnerBySlot(HashSlot(oss.str())) == 1 && oss.str() != key_ok) {
key_err = oss.str();
break;
}
}
ASSERT_FALSE(key_err.empty());
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key_ok] = "ok-value";
}
_meta->forced_error_by_key[key_err] = "ERR injected pipeline error";
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key_ok.c_str()));
ASSERT_TRUE(req.AddCommand("get %s", key_nil.c_str()));
ASSERT_TRUE(req.AddCommand("get %s", key_err.c_str()));
ASSERT_TRUE(req.AddCommand("get %s", key_ok.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(4, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ("ok-value", resp.reply(0).data());
ASSERT_TRUE(resp.reply(1).is_nil());
ASSERT_TRUE(resp.reply(2).is_error());
ASSERT_NE(std::string::npos,
std::string(resp.reply(2).error_message()).find("injected pipeline error"));
ASSERT_TRUE(resp.reply(3).is_string());
ASSERT_EQ("ok-value", resp.reply(3).data());
}
TEST_F(RedisClusterChannelTest, empty_request_should_fail) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_TRUE(cntl.Failed());
ASSERT_NE(std::string::npos, cntl.ErrorText().find("no redis command"));
}
TEST_F(RedisClusterChannelTest, pipeline_continues_after_command_error_reply) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key_ok = FindKeyForNode(1);
const std::string key_err = FindKeyForNode(0);
_meta->forced_error_by_key[key_err] = "ERR injected get failure";
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key_ok] = "ok-value";
}
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key_err.c_str()));
ASSERT_TRUE(req.AddCommand("get %s", key_ok.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(2, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_error());
ASSERT_NE(std::string::npos,
std::string(resp.reply(0).error_message()).find("injected get failure"));
ASSERT_TRUE(resp.reply(1).is_string());
ASSERT_EQ("ok-value", resp.reply(1).data());
}
TEST_F(RedisClusterChannelTest, redirect_updates_slot_cache_even_when_refresh_fails) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key = FindKeyForNode(0);
_meta->owner_override[key] = 1;
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key] = "value-on-node1";
}
_meta->fail_slots = true;
_meta->fail_nodes = true;
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ("value-on-node1", resp.reply(0).data());
brpc::RedisRequest req2;
brpc::RedisResponse resp2;
brpc::Controller cntl2;
ASSERT_TRUE(req2.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl2, &req2, &resp2, NULL);
ASSERT_FALSE(cntl2.Failed()) << cntl2.ErrorText();
ASSERT_EQ("value-on-node1", resp2.reply(0).data());
ASSERT_EQ(1, _meta->moved_error_calls.load(std::memory_order_relaxed));
}
TEST_F(RedisClusterChannelTest, periodic_refresh_fallbacks_to_nodes_when_slots_fail) {
brpc::RedisClusterChannel channel;
brpc::RedisClusterChannelOptions options;
options.enable_periodic_refresh = true;
options.refresh_interval_s = 1;
options.max_redirect = 5;
ASSERT_EQ(0, channel.Init(SeedList(), &options));
_meta->fail_slots = true;
const int before_nodes = _meta->nodes_calls.load(std::memory_order_relaxed);
bool nodes_used = false;
for (int i = 0; i < 30; ++i) {
if (_meta->nodes_calls.load(std::memory_order_relaxed) > before_nodes) {
nodes_used = true;
break;
}
bthread_usleep(100000);
}
ASSERT_TRUE(nodes_used);
}
TEST_F(RedisClusterChannelTest, periodic_refresh_updates_slot_cache_on_topology_change) {
brpc::RedisClusterChannel channel;
brpc::RedisClusterChannelOptions options;
options.enable_periodic_refresh = true;
options.refresh_interval_s = 1;
options.max_redirect = 5;
ASSERT_EQ(0, channel.Init(SeedList(), &options));
_meta->accept_requests_on_wrong_owner.store(true, std::memory_order_relaxed);
const std::string key = FindKeyForNode(0);
const int slot = static_cast<int>(HashSlot(key));
const std::string value_by_owner[2] = {"value-on-node0", "value-on-node1"};
{
BAIDU_SCOPED_LOCK(_node[0].mutex);
_node[0].kv[key] = value_by_owner[0];
}
{
BAIDU_SCOPED_LOCK(_node[1].mutex);
_node[1].kv[key] = value_by_owner[1];
}
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl, &req, &resp, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(1, resp.reply_size());
ASSERT_TRUE(resp.reply(0).is_string());
ASSERT_EQ(value_by_owner[0], resp.reply(0).data());
const int before_slots = _meta->slots_calls.load(std::memory_order_relaxed);
const int target_owner = 1 - OwnerBySlot(slot);
_meta->slots_override_slot.store(slot, std::memory_order_relaxed);
_meta->slots_override_owner.store(target_owner, std::memory_order_relaxed);
bool updated = false;
for (int i = 0; i < 50; ++i) {
brpc::RedisRequest req2;
brpc::RedisResponse resp2;
brpc::Controller cntl2;
ASSERT_TRUE(req2.AddCommand("get %s", key.c_str()));
channel.CallMethod(NULL, &cntl2, &req2, &resp2, NULL);
ASSERT_FALSE(cntl2.Failed()) << cntl2.ErrorText();
ASSERT_EQ(1, resp2.reply_size());
ASSERT_TRUE(resp2.reply(0).is_string());
if (resp2.reply(0).data() == value_by_owner[target_owner]) {
updated = true;
break;
}
bthread_usleep(100000);
}
ASSERT_TRUE(updated);
ASSERT_GT(_meta->slots_calls.load(std::memory_order_relaxed), before_slots);
ASSERT_EQ(0, _meta->moved_error_calls.load(std::memory_order_relaxed));
}
TEST_F(RedisClusterChannelTest, async_pipeline_mixed_commands) {
brpc::RedisClusterChannel channel;
InitChannel(&channel);
const std::string key0 = FindKeyForNode(0);
const std::string key1 = FindKeyForNode(1);
brpc::RedisRequest req;
brpc::RedisResponse resp;
brpc::Controller cntl;
ASSERT_TRUE(req.AddCommand("set %s p0", key0.c_str()));
ASSERT_TRUE(req.AddCommand("set %s p1", key1.c_str()));
ASSERT_TRUE(req.AddCommand("mget %s %s", key0.c_str(), key1.c_str()));
ASSERT_TRUE(req.AddCommand("del %s %s", key0.c_str(), key1.c_str()));
bthread::CountdownEvent event(1);
Done done(&event);
channel.CallMethod(NULL, &cntl, &req, &resp, &done);
event.wait();
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(4, resp.reply_size());
ASSERT_EQ("OK", resp.reply(0).data());
ASSERT_EQ("OK", resp.reply(1).data());
ASSERT_TRUE(resp.reply(2).is_array());
ASSERT_EQ("p0", resp.reply(2)[0].data());
ASSERT_EQ("p1", resp.reply(2)[1].data());
ASSERT_TRUE(resp.reply(3).is_integer());
ASSERT_EQ(2, resp.reply(3).integer());
}
TEST_F(RedisClusterChannelTest, periodic_refresh_updates_topology_in_background) {
brpc::RedisClusterChannel channel;
brpc::RedisClusterChannelOptions options;
options.enable_periodic_refresh = true;
options.refresh_interval_s = 1;
options.max_redirect = 5;
ASSERT_EQ(0, channel.Init(SeedList(), &options));
const int initial_slots_calls = _meta->slots_calls.load(std::memory_order_relaxed);
bool refreshed = false;
for (int i = 0; i < 30; ++i) {
if (_meta->slots_calls.load(std::memory_order_relaxed) > initial_slots_calls) {
refreshed = true;
break;
}
bthread_usleep(100000);
}
ASSERT_TRUE(refreshed);
}
TEST_F(RedisClusterChannelTest, periodic_refresh_thread_stops_quickly_on_destroy) {
typedef std::chrono::steady_clock Clock;
const Clock::time_point begin = Clock::now();
{
brpc::RedisClusterChannel channel;
brpc::RedisClusterChannelOptions options;
options.enable_periodic_refresh = true;
options.refresh_interval_s = 30;
ASSERT_EQ(0, channel.Init(SeedList(), &options));
}
const Clock::time_point end = Clock::now();
const int64_t elapsed_ms =
std::chrono::duration_cast<std::chrono::milliseconds>(end - begin).count();
ASSERT_LT(elapsed_ms, 2000);
}
TEST_F(RedisClusterChannelTest, init_with_empty_seed_should_fail) {
brpc::RedisClusterChannel channel;
ASSERT_NE(0, channel.Init(""));
}
} // namespace