blob: cef7f40ec9aa62546416894f97485de27b03c33d [file] [log] [blame]
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include <string.h>
#include <chrono>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include "gtest/gtest.h"
#include "runtime/api_layer1.h"
#include "runtime/api_task.h"
#include "runtime/global_config.h"
#include "runtime/rpc/asio_net_provider.h"
#include "runtime/rpc/network.h"
#include "runtime/rpc/network.sim.h"
#include "runtime/rpc/rpc_address.h"
#include "runtime/rpc/rpc_engine.h"
#include "runtime/rpc/rpc_message.h"
#include "runtime/rpc/serialization.h"
#include "runtime/service_engine.h"
#include "runtime/task/task.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_spec.h"
#include "runtime/test_utils.h"
#include "utils/autoref_ptr.h"
#include "utils/error_code.h"
#include "utils/flags.h"
#include "utils/fmt_logging.h"
namespace dsn {
DSN_DECLARE_uint32(conn_threshold_per_ip);
class asio_network_provider_test : public tools::asio_network_provider
{
public:
asio_network_provider_test(rpc_engine *srv, network *inner_provider)
: tools::asio_network_provider(srv, inner_provider)
{
}
};
static int TEST_PORT = 20401;
DEFINE_TASK_CODE_RPC(RPC_TEST_NETPROVIDER, TASK_PRIORITY_COMMON, THREAD_POOL_TEST_SERVER)
volatile int wait_flag = 0;
void response_handler(dsn::error_code ec,
dsn::message_ex *req,
dsn::message_ex *resp,
void *request_buf)
{
if (ERR_OK == ec) {
std::string response_string;
char *request_str = (char *)(request_buf);
::dsn::unmarshall(resp, response_string);
ASSERT_EQ(response_string, request_str);
} else {
LOG_INFO("error msg: {}", ec);
}
wait_flag = 1;
}
void reject_response_handler(dsn::error_code ec)
{
wait_flag = 1;
ASSERT_TRUE(ERR_TIMEOUT == ec);
}
void rpc_server_response(dsn::message_ex *request)
{
std::string str_command;
::dsn::unmarshall(request, str_command);
dsn::message_ex *response = request->create_response();
::dsn::marshall(response, str_command);
dsn_rpc_reply(response);
}
void wait_response()
{
while (wait_flag == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
void rpc_client_session_send(rpc_session_ptr client_session, bool reject = false)
{
message_ex *msg = message_ex::create_request(RPC_TEST_NETPROVIDER, 0, 0);
std::unique_ptr<char[]> buf(new char[128]);
memset(buf.get(), 0, 128);
strcpy(buf.get(), "hello world");
::dsn::marshall(msg, std::string(buf.get()));
wait_flag = 0;
if (!reject) {
rpc_response_task *t = new rpc_response_task(msg,
std::bind(&response_handler,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
buf.get()),
0);
client_session->net().engine()->matcher()->on_call(msg, t);
} else {
rpc_response_task *t = new rpc_response_task(
msg, std::bind(&reject_response_handler, std::placeholders::_1), 0);
client_session->net().engine()->matcher()->on_call(msg, t);
}
client_session->send_message(msg);
wait_response();
}
TEST(tools_common, asio_net_provider)
{
if (dsn::service_engine::instance().spec().semaphore_factory_name ==
"dsn::tools::sim_semaphore_provider")
return;
ASSERT_TRUE(dsn_rpc_register_handler(
RPC_TEST_NETPROVIDER, "rpc.test.netprovider", rpc_server_response));
std::unique_ptr<tools::asio_network_provider> asio_network(
new tools::asio_network_provider(task::get_current_rpc(), nullptr));
error_code start_result;
start_result = asio_network->start(RPC_CHANNEL_TCP, TEST_PORT, true);
ASSERT_TRUE(start_result == ERR_OK);
// the same asio network handle, start only client is ok
start_result = asio_network->start(RPC_CHANNEL_TCP, TEST_PORT, true);
ASSERT_TRUE(start_result == ERR_OK);
rpc_address network_addr = asio_network->address();
ASSERT_TRUE(network_addr.port() == TEST_PORT);
std::unique_ptr<tools::asio_network_provider> asio_network2(
new tools::asio_network_provider(task::get_current_rpc(), nullptr));
start_result = asio_network2->start(RPC_CHANNEL_TCP, TEST_PORT, true);
ASSERT_TRUE(start_result == ERR_OK);
start_result = asio_network2->start(RPC_CHANNEL_TCP, TEST_PORT, false);
ASSERT_TRUE(start_result == ERR_OK);
LOG_INFO("result: {}", start_result);
start_result = asio_network2->start(RPC_CHANNEL_TCP, TEST_PORT, false);
ASSERT_TRUE(start_result == ERR_SERVICE_ALREADY_RUNNING);
LOG_INFO("result: {}", start_result);
rpc_session_ptr client_session =
asio_network->create_client_session(rpc_address("localhost", TEST_PORT));
client_session->connect();
rpc_client_session_send(client_session);
ASSERT_TRUE(dsn_rpc_unregiser_handler(RPC_TEST_NETPROVIDER));
TEST_PORT++;
}
TEST(tools_common, asio_udp_provider)
{
if (dsn::service_engine::instance().spec().semaphore_factory_name ==
"dsn::tools::sim_semaphore_provider")
return;
ASSERT_TRUE(dsn_rpc_register_handler(
RPC_TEST_NETPROVIDER, "rpc.test.netprovider", rpc_server_response));
std::unique_ptr<tools::asio_udp_provider> client(
new tools::asio_udp_provider(task::get_current_rpc(), nullptr));
error_code start_result;
start_result = client->start(RPC_CHANNEL_UDP, 0, true);
ASSERT_TRUE(start_result == ERR_OK);
start_result = client->start(RPC_CHANNEL_UDP, TEST_PORT, false);
ASSERT_TRUE(start_result == ERR_OK);
message_ex *msg = message_ex::create_request(RPC_TEST_NETPROVIDER, 0, 0);
std::unique_ptr<char[]> buf(new char[128]);
memset(buf.get(), 0, 128);
strcpy(buf.get(), "hello world");
::dsn::marshall(msg, std::string(buf.get()));
wait_flag = 0;
rpc_response_task *t = new rpc_response_task(msg,
std::bind(&response_handler,
std::placeholders::_1,
std::placeholders::_2,
std::placeholders::_3,
buf.get()),
0);
client->engine()->matcher()->on_call(msg, t);
client->send_message(msg);
wait_response();
ASSERT_TRUE(dsn_rpc_unregiser_handler(RPC_TEST_NETPROVIDER));
TEST_PORT++;
}
TEST(tools_common, sim_net_provider)
{
if (dsn::service_engine::instance().spec().semaphore_factory_name ==
"dsn::tools::sim_semaphore_provider")
return;
ASSERT_TRUE(dsn_rpc_register_handler(
RPC_TEST_NETPROVIDER, "rpc.test.netprovider", rpc_server_response));
std::unique_ptr<tools::sim_network_provider> sim_net(
new tools::sim_network_provider(task::get_current_rpc(), nullptr));
error_code ans;
ans = sim_net->start(RPC_CHANNEL_TCP, TEST_PORT, false);
ASSERT_TRUE(ans == ERR_OK);
ans = sim_net->start(RPC_CHANNEL_TCP, TEST_PORT, false);
ASSERT_TRUE(ans == ERR_ADDRESS_ALREADY_USED);
rpc_session_ptr client_session =
sim_net->create_client_session(rpc_address("localhost", TEST_PORT));
client_session->connect();
rpc_client_session_send(client_session);
ASSERT_TRUE(dsn_rpc_unregiser_handler(RPC_TEST_NETPROVIDER));
TEST_PORT++;
}
TEST(tools_common, asio_network_provider_connection_threshold)
{
if (dsn::service_engine::instance().spec().semaphore_factory_name ==
"dsn::tools::sim_semaphore_provider")
return;
ASSERT_TRUE(dsn_rpc_register_handler(
RPC_TEST_NETPROVIDER, "rpc.test.netprovider", rpc_server_response));
std::unique_ptr<asio_network_provider_test> asio_network(
new asio_network_provider_test(task::get_current_rpc(), nullptr));
error_code start_result;
start_result = asio_network->start(RPC_CHANNEL_TCP, TEST_PORT, false);
ASSERT_TRUE(start_result == ERR_OK);
auto CONN_THRESHOLD = 3;
LOG_INFO("change FLAGS_conn_threshold_per_ip {} -> {} for test",
FLAGS_conn_threshold_per_ip,
CONN_THRESHOLD);
FLAGS_conn_threshold_per_ip = CONN_THRESHOLD;
// not exceed threshold
for (int count = 0; count < CONN_THRESHOLD + 2; count++) {
LOG_INFO("client # {}", count);
rpc_session_ptr client_session =
asio_network->create_client_session(rpc_address("localhost", TEST_PORT));
client_session->connect();
rpc_client_session_send(client_session);
client_session->close();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
// exceed threshold
bool reject = false;
for (int count = 0; count < CONN_THRESHOLD + 2; count++) {
LOG_INFO("client # {}", count);
rpc_session_ptr client_session =
asio_network->create_client_session(rpc_address("localhost", TEST_PORT));
client_session->connect();
if (count >= CONN_THRESHOLD)
reject = true;
rpc_client_session_send(client_session, reject);
}
ASSERT_TRUE(dsn_rpc_unregiser_handler(RPC_TEST_NETPROVIDER));
TEST_PORT++;
}
} // namespace dsn