blob: 7649b3b41c02f550f4cff958ba3b8554d4b3f6f0 [file] [log] [blame]
/**
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#define CATCH_CONFIG_EXTERNAL_INTERFACES
#include <catch.hpp> /* catch unit-test framework */
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <stdio.h>
#include <thread>
#include <future>
#include <chrono>
#include <fstream>
#include <tscore/BufferWriter.h>
#include <tscore/ts_file.h>
#include "ts/ts.h"
#include "rpc/jsonrpc/JsonRPC.h"
#include "rpc/server/RPCServer.h"
#include "rpc/server/IPCSocketServer.h"
#include "shared/rpc/IPCSocketClient.h"
#include "I_EventSystem.h"
#include "tscore/I_Layout.h"
#include "diags.i"
#define DEFINE_JSONRPC_PROTO_FUNCTION(fn) ts::Rv<YAML::Node> fn(std::string_view const &id, const YAML::Node &params)
namespace fs = ts::file;
namespace rpc
{
bool
test_remove_handler(std::string_view name)
{
return rpc::JsonRPCManager::instance().remove_handler(name);
}
template <typename Func>
inline bool
add_method_handler(const std::string &name, Func &&call)
{
return rpc::JsonRPCManager::instance().add_method_handler(name, std::forward<Func>(call), nullptr, {});
}
} // namespace rpc
static const std::string sockPath{"/tmp/jsonrpc20_test.sock"};
static const std::string lockPath{"/tmp/jsonrpc20_test.lock"};
static constexpr int default_backlog{5};
static constexpr int default_maxRetriesOnTransientErrors{64};
static constexpr auto logTag{"rpc.test.client"};
struct RPCServerTestListener : Catch::TestEventListenerBase {
using TestEventListenerBase::TestEventListenerBase; // inherit constructor
~RPCServerTestListener();
// The whole test run starting
void
testRunStarting(Catch::TestRunInfo const &testRunInfo) override
{
Layout::create();
init_diags("rpc|rpc.test", nullptr);
RecProcessInit(RECM_STAND_ALONE);
signal(SIGPIPE, SIG_IGN);
ink_event_system_init(EVENT_SYSTEM_MODULE_PUBLIC_VERSION);
eventProcessor.start(2, 1048576);
// EThread *main_thread = new EThread;
main_thread = std::make_unique<EThread>();
main_thread->set_specific();
rpc::config::RPCConfig serverConfig;
auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")" + lockPath + R"(", "sock_path_name": ")" + sockPath +
R"(", "backlog": 5,"max_retry_on_transient_errors": 64 }}})"};
YAML::Node configNode = YAML::Load(confStr);
serverConfig.load(configNode["rpc"]);
try {
jsonrpcServer = new rpc::RPCServer(serverConfig);
jsonrpcServer->start_thread();
} catch (std::exception const &ex) {
Debug(logTag, "Oops: %s", ex.what());
}
}
// The whole test run ending
void
testRunEnded(Catch::TestRunStats const &testRunStats) override
{
// jsonrpcServer->stop_thread();
// delete main_thread;
if (jsonrpcServer) {
delete jsonrpcServer;
}
}
private:
// std::unique_ptr<rpc::RPCServer> jrpcServer;
std::unique_ptr<EThread> main_thread;
};
CATCH_REGISTER_LISTENER(RPCServerTestListener)
RPCServerTestListener::~RPCServerTestListener() {}
DEFINE_JSONRPC_PROTO_FUNCTION(some_foo) // id, params
{
ts::Rv<YAML::Node> resp;
int dur{1};
try {
dur = params["duration"].as<int>();
} catch (...) {
}
INFO("Sleeping for " << dur << "s");
std::this_thread::sleep_for(std::chrono::seconds(dur));
resp.result()["res"] = "ok";
resp.result()["duration"] = dur;
INFO("Done sleeping");
return resp;
}
namespace
{
/* Create and return a path to a temporary sandbox directory. */
fs::path
getTemporaryDir()
{
std::error_code ec;
fs::path tmpDir = fs::canonical(fs::temp_directory_path(), ec);
tmpDir /= "sandbox_XXXXXX";
char dirNameTemplate[tmpDir.string().length() + 1];
sprintf(dirNameTemplate, "%s", tmpDir.c_str());
return fs::path(mkdtemp(dirNameTemplate));
}
// Handy class to avoid manually disconecting the socket.
// TODO: should it also connect?
struct ScopedLocalSocket : shared::rpc::IPCSocketClient {
using super = shared::rpc::IPCSocketClient;
// TODO, use another path.
ScopedLocalSocket() : IPCSocketClient(sockPath) {}
~ScopedLocalSocket() { IPCSocketClient::disconnect(); }
template <std::size_t N>
void
send_in_chunks(std::string_view data, int disconnect_after_chunk_n = -1)
{
int chunk_number{1};
auto chunks = chunk<N>(data);
for (auto &&part : chunks) {
if (::write(_sock, part.c_str(), part.size()) < 0) {
Debug(logTag, "error sending message :%s", std ::strerror(errno));
break;
}
if (disconnect_after_chunk_n == chunk_number) {
Debug(logTag, "Disconnecting it after chunk %d", chunk_number);
super::disconnect();
return;
}
++chunk_number;
}
}
// basic read, if fail, why it fail is irrelevant in this test.
std::string
read()
{
ts::LocalBufferWriter<32000> bw;
auto ret = super::read_all(bw);
if (ret == ReadStatus::NO_ERROR) {
return {bw.data(), bw.size()};
}
return {};
}
// small wrapper function to deal with the bw.
std::string
query(std::string_view msg)
{
ts::LocalBufferWriter<32000> bw;
auto ret = connect().send(msg).read_all(bw);
if (ret == ReadStatus::NO_ERROR) {
return {bw.data(), bw.size()};
}
return {};
}
private:
template <typename Iter, std::size_t N>
std::array<std::string, N>
chunk_impl(Iter from, Iter to)
{
const std::size_t size = std::distance(from, to);
if (size <= N) {
return {std::string{from, to}};
}
std::size_t index{0};
std::array<std::string, N> ret;
const std::size_t each_part = size / N;
const std::size_t remainder = size % N;
for (auto it = from; it != to;) {
if (std::size_t rem = std::distance(it, to); rem == (each_part + remainder)) {
ret[index++] = std::string{it, it + rem};
break;
}
ret[index++] = std::string{it, it + each_part};
std::advance(it, each_part);
}
return ret;
}
template <std::size_t N>
auto
chunk(std::string_view v)
{
return chunk_impl<std::string_view::const_iterator, N>(v.begin(), v.end());
}
};
// helper function to send a request and update the promise when the response is done.
// This is to be used in a multithread test.
void
send_request(std::string json, std::promise<std::string> p)
{
ScopedLocalSocket rpc_client;
auto resp = rpc_client.query(json);
p.set_value(resp);
}
} // namespace
TEST_CASE("Sending 'concurrent' requests to the rpc server.", "[thread]")
{
SECTION("A registered handlers")
{
rpc::add_method_handler("some_foo", &some_foo);
rpc::add_method_handler("some_foo2", &some_foo);
std::promise<std::string> p1;
std::promise<std::string> p2;
auto fut1 = p1.get_future();
auto fut2 = p2.get_future();
REQUIRE_NOTHROW([&]() {
// Two different clients, on the same server, as the server is an Unix Domain Socket, it should handle all this
// properly, in any case we just run the basic smoke test for our server.
auto t1 = std::thread(&send_request, R"({"jsonrpc": "2.0", "method": "some_foo", "params": {"duration": 1}, "id": "aBcD"})",
std::move(p1));
auto t2 = std::thread(&send_request, R"({"jsonrpc": "2.0", "method": "some_foo", "params": {"duration": 1}, "id": "eFgH"})",
std::move(p2));
// wait to get the promise set.
fut1.wait();
fut2.wait();
// the expected
std::string_view expected1{R"({"jsonrpc": "2.0", "result": {"res": "ok", "duration": "1"}, "id": "aBcD"})"};
std::string_view expected2{R"({"jsonrpc": "2.0", "result": {"res": "ok", "duration": "1"}, "id": "eFgH"})"};
CHECK(fut1.get() == expected1);
CHECK(fut2.get() == expected2);
t1.join();
t2.join();
}());
}
}
std::string
random_string(std::string::size_type length)
{
auto randchar = []() -> char {
const char charset[] = "0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
const size_t max_index = (sizeof(charset) - 1);
return charset[rand() % max_index];
};
std::string str(length, 0);
std::generate_n(str.begin(), length, randchar);
return str;
}
DEFINE_JSONRPC_PROTO_FUNCTION(do_nothing) // id, params, resp
{
ts::Rv<YAML::Node> resp;
resp.result()["size"] = params["msg"].as<std::string>().size();
return resp;
}
TEST_CASE("Basic message sending to a running server", "[socket]")
{
REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
SECTION("Basic single request to the rpc server")
{
const int S{500};
auto json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params": {"msg":")" + random_string(S) + R"("}, "id":"EfGh-1"})"};
REQUIRE_NOTHROW([&]() {
ScopedLocalSocket rpc_client;
auto resp = rpc_client.query(json);
REQUIRE(resp == R"({"jsonrpc": "2.0", "result": {"size": ")" + std::to_string(S) + R"("}, "id": "EfGh-1"})");
}());
}
REQUIRE(rpc::test_remove_handler("do_nothing"));
}
TEST_CASE("Sending a message bigger than the internal server's buffer. 32000", "[buffer][error]")
{
REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
SECTION("Message larger than the the accepted size.")
{
const int S{32000}; // + the rest of the json message.
auto json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params": {"msg":")" + random_string(S) + R"("}, "id":"EfGh-1"})"};
REQUIRE_NOTHROW([&]() {
ScopedLocalSocket rpc_client;
auto resp = rpc_client.query(json);
REQUIRE(resp.empty());
}());
}
REQUIRE(rpc::test_remove_handler("do_nothing"));
}
TEST_CASE("Test with invalid json message", "[socket]")
{
REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
SECTION("A rpc server")
{
const int S{10};
auto json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params": { "msg": ")" + random_string(S) + R"("}, "id": "EfGh})"};
REQUIRE_NOTHROW([&]() {
ScopedLocalSocket rpc_client;
auto resp = rpc_client.query(json);
CHECK(resp == R"({"jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}})");
}());
}
REQUIRE(rpc::test_remove_handler("do_nothing"));
}
TEST_CASE("Test with chunks", "[socket][chunks]")
{
REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
SECTION("Sending request by chunks")
{
const int S{10};
auto json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params": { "msg": ")" + random_string(S) +
R"("}, "id": "chunk-parts-3"})"};
REQUIRE_NOTHROW([&]() {
ScopedLocalSocket rpc_client;
using namespace std::chrono_literals;
rpc_client.connect();
rpc_client.send_in_chunks<3>(json);
auto resp = rpc_client.read();
REQUIRE(resp == R"({"jsonrpc": "2.0", "result": {"size": ")" + std::to_string(S) + R"("}, "id": "chunk-parts-3"})");
}());
}
REQUIRE(rpc::test_remove_handler("do_nothing"));
}
TEST_CASE("Test with chunks - disconnect after second part", "[socket][chunks]")
{
REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
SECTION("Sending request by chunks")
{
const int S{4000};
auto json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params": { "msg": ")" + random_string(S) +
R"("}, "id": "chunk-parts-3-2"})"};
REQUIRE_NOTHROW([&]() {
ScopedLocalSocket rpc_client;
using namespace std::chrono_literals;
rpc_client.connect();
rpc_client.send_in_chunks<3>(json, 2);
// read will fail.
auto resp = rpc_client.read();
REQUIRE(resp == "");
}());
}
REQUIRE(rpc::test_remove_handler("do_nothing"));
}
TEST_CASE("Test with chunks - incomplete message", "[socket][chunks]")
{
REQUIRE(rpc::add_method_handler("do_nothing", &do_nothing));
SECTION("Sending request by chunks, broken message")
{
const int S{50};
auto json{R"({"jsonrpc": "2.0", "method": "do_nothing", "params": { "msg": ")" + random_string(S) +
R"("}, "id": "chunk-parts-3)"};
// ^ missing-> "}
REQUIRE_NOTHROW([&]() {
ScopedLocalSocket rpc_client;
using namespace std::chrono_literals;
rpc_client.connect();
rpc_client.send_in_chunks<3>(json);
auto resp = rpc_client.read();
REQUIRE(resp == R"({"jsonrpc": "2.0", "error": {"code": -32700, "message": "Parse error"}})");
}());
}
REQUIRE(rpc::test_remove_handler("do_nothing"));
}
// Enable toggle
TEST_CASE("Test rpc enable toggle feature - default enabled.", "[default values]")
{
rpc::config::RPCConfig serverConfig;
REQUIRE(serverConfig.is_enabled() == true);
}
TEST_CASE("Test rpc enable toggle feature. Enabled by configuration", "[rpc][enabled]")
{
rpc::config::RPCConfig serverConfig;
auto confStr{R"({"rpc": {"enabled": true}})"};
std::cout << "'" << confStr << "'" << std::endl;
YAML::Node configNode = YAML::Load(confStr);
serverConfig.load(configNode["rpc"]);
REQUIRE(serverConfig.is_enabled() == true);
}
TEST_CASE("Test rpc enable toggle feature. Disabled by configuration", "[rpc][disabled]")
{
rpc::config::RPCConfig serverConfig;
auto confStr{R"({"rpc": {"enabled":false}})"};
REQUIRE_NOTHROW([&]() {
YAML::Node configNode = YAML::Load(confStr);
serverConfig.load(configNode["rpc"]);
}());
REQUIRE(serverConfig.is_enabled() == false);
}
// TEST UDS Server configuration
namespace
{
namespace trp = rpc::comm;
// This class is defined to get access to the protected config object inside the IPCSocketServer class.
struct LocalSocketTest : public trp::IPCSocketServer {
inline static const std::string _name = "LocalSocketTest";
bool
configure(YAML::Node const &params) override
{
return trp::IPCSocketServer::configure(params);
}
void
run() override
{
}
std::error_code
init() override
{
return trp::IPCSocketServer::init();
}
bool
stop() override
{
return true;
}
std::string const &
name() const override
{
return _name;
}
trp::IPCSocketServer::Config const &
get_conf() const
{
return _conf;
}
};
} // namespace
TEST_CASE("Test configuration parsing. UDS values", "[string]")
{
rpc::config::RPCConfig serverConfig;
auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")" + lockPath + R"(", "sock_path_name": ")" + sockPath +
R"(", "backlog": 5,"max_retry_on_transient_errors": 64 }}})"};
YAML::Node configNode = YAML::Load(confStr);
serverConfig.load(configNode["rpc"]);
REQUIRE(serverConfig.get_comm_type() == rpc::config::RPCConfig::CommType::UNIX);
auto socket = std::make_unique<LocalSocketTest>();
auto const ret = socket->configure(serverConfig.get_comm_config_params());
REQUIRE(ret);
REQUIRE(socket->get_conf().backlog == default_backlog);
REQUIRE(socket->get_conf().maxRetriesOnTransientErrors == default_maxRetriesOnTransientErrors);
REQUIRE(socket->get_conf().sockPathName == sockPath);
REQUIRE(socket->get_conf().lockPathName == lockPath);
}
TEST_CASE("Test configuration parsing from a file. UDS Server", "[file]")
{
fs::path sandboxDir = getTemporaryDir();
fs::path configPath = sandboxDir / "jsonrpc.yaml";
// define here to later compare.
std::string sockPathName{configPath.string() + "jsonrpc20_test2.sock"};
std::string lockPathName{configPath.string() + "jsonrpc20_test2.lock"};
auto confStr{R"({"rpc": { "enabled": true, "unix": { "lock_path_name": ")" + lockPathName + R"(", "sock_path_name": ")" +
sockPathName + R"(", "backlog": 5,"max_retry_on_transient_errors": 64 }}})"};
// write the config.
std::ofstream ofs(configPath.string(), std::ofstream::out);
// Yes, we write json into the yaml, remember, YAML is a superset of JSON, yaml parser can handle this.
ofs << confStr;
ofs.close();
rpc::config::RPCConfig serverConfig;
// on any error reading the file, default values will be used.
serverConfig.load_from_file(configPath.string());
REQUIRE(serverConfig.get_comm_type() == rpc::config::RPCConfig::CommType::UNIX);
auto socket = std::make_unique<LocalSocketTest>();
auto const &ret = socket->configure(serverConfig.get_comm_config_params());
REQUIRE(ret);
REQUIRE(socket->get_conf().backlog == 5);
REQUIRE(socket->get_conf().maxRetriesOnTransientErrors == 64);
REQUIRE(socket->get_conf().sockPathName == sockPathName);
REQUIRE(socket->get_conf().lockPathName == lockPathName);
std::error_code ec;
REQUIRE(fs::remove(sandboxDir, ec));
}