blob: f7e62c8114819c91006008f0b1830de43d65b234 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// brpc - A framework to host and access services throughout Baidu.
// Date: 2015/10/22 16:28:44
#include <gtest/gtest.h>
#include "brpc/server.h"
#include "brpc/controller.h"
#include "brpc/channel.h"
#include "brpc/stream_impl.h"
#include "echo.pb.h"
class AfterAcceptStream {
public:
virtual void action(brpc::StreamId) = 0;
};
class MyServiceWithStream : public test::EchoService {
public:
MyServiceWithStream(const brpc::StreamOptions& options)
: _options(options)
, _after_accept_stream(NULL)
{}
MyServiceWithStream(const brpc::StreamOptions& options,
AfterAcceptStream* after_accept_stream)
: _options(options)
, _after_accept_stream(after_accept_stream)
{}
MyServiceWithStream()
: _options()
, _after_accept_stream(NULL)
{}
void Echo(::google::protobuf::RpcController* controller,
const ::test::EchoRequest* request,
::test::EchoResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_gurad(done);
response->set_message(request->message());
brpc::Controller* cntl = (brpc::Controller*)controller;
brpc::StreamId response_stream;
ASSERT_EQ(0, StreamAccept(&response_stream, *cntl, &_options));
LOG(INFO) << "Created response_stream=" << response_stream;
if (_after_accept_stream) {
_after_accept_stream->action(response_stream);
}
}
private:
brpc::StreamOptions _options;
AfterAcceptStream* _after_accept_stream;
};
class StreamingRpcTest : public testing::Test {
protected:
test::EchoRequest request;
test::EchoResponse response;
void SetUp() { request.set_message("hello world"); }
void TearDown() {}
};
TEST_F(StreamingRpcTest, sanity) {
brpc::Server server;
MyServiceWithStream service;
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, NULL));
brpc::ScopedStream stream_guard(request_stream);
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
usleep(10);
brpc::StreamClose(request_stream);
server.Stop(0);
server.Join();
}
struct HandlerControl {
HandlerControl()
: block(false)
{}
bool block;
};
class OrderedInputHandler : public brpc::StreamInputHandler {
public:
explicit OrderedInputHandler(HandlerControl *cntl = NULL)
: _expected_next_value(0)
, _failed(false)
, _stopped(false)
, _idle_times(0)
, _cntl(cntl)
{
}
int on_received_messages(brpc::StreamId /*id*/,
butil::IOBuf *const messages[],
size_t size) {
if (_cntl && _cntl->block) {
while (_cntl->block) {
usleep(100);
}
}
for (size_t i = 0; i < size; ++i) {
CHECK(messages[i]->length() == sizeof(int));
int network = 0;
messages[i]->cutn(&network, sizeof(int));
EXPECT_EQ((int)ntohl(network), _expected_next_value++);
}
return 0;
}
void on_idle_timeout(brpc::StreamId /*id*/) {
++_idle_times;
}
void on_closed(brpc::StreamId /*id*/) {
ASSERT_FALSE(_stopped);
_stopped = true;
}
bool failed() const { return _failed; }
bool stopped() const { return _stopped; }
int idle_times() const { return _idle_times; }
private:
int _expected_next_value;
bool _failed;
bool _stopped;
int _idle_times;
HandlerControl* _cntl;
};
TEST_F(StreamingRpcTest, received_in_order) {
OrderedInputHandler handler;
brpc::StreamOptions opt;
opt.handler = &handler;
opt.messages_in_batch = 100;
brpc::Server server;
MyServiceWithStream service(opt);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
brpc::StreamOptions request_stream_options;
request_stream_options.max_buf_size = 0;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
brpc::ScopedStream stream_guard(request_stream);
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
const int N = 10000;
for (int i = 0; i < N; ++i) {
int network = htonl(i);
butil::IOBuf out;
out.append(&network, sizeof(network));
ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i;
}
ASSERT_EQ(0, brpc::StreamClose(request_stream));
server.Stop(0);
server.Join();
while (!handler.stopped()) {
usleep(100);
}
ASSERT_FALSE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
ASSERT_EQ(N, handler._expected_next_value);
}
void on_writable(brpc::StreamId, void* arg, int error_code) {
std::pair<bool, int>* p = (std::pair<bool, int>*)arg;
p->first = true;
p->second = error_code;
LOG(INFO) << "error_code=" << error_code;
}
TEST_F(StreamingRpcTest, block) {
HandlerControl hc;
OrderedInputHandler handler(&hc);
hc.block = true;
brpc::StreamOptions opt;
opt.handler = &handler;
const int N = 10000;
opt.max_buf_size = sizeof(uint32_t) * N;
brpc::Server server;
MyServiceWithStream service(opt);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
brpc::ScopedStream stream_guard(request_stream);
brpc::StreamOptions request_stream_options;
request_stream_options.max_buf_size = sizeof(uint32_t) * N;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream="
<< request_stream;
for (int i = 0; i < N; ++i) {
int network = htonl(i);
butil::IOBuf out;
out.append(&network, sizeof(network));
ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i;
}
// sync wait
int dummy = 102030123;
butil::IOBuf out;
out.append(&dummy, sizeof(dummy));
ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out));
hc.block = false;
ASSERT_EQ(0, brpc::StreamWait(request_stream, NULL));
// wait flushing all the pending messages
while (handler._expected_next_value != N) {
usleep(100);
}
// block hanlder again to test async wait
hc.block = true;
// async wait
for (int i = N; i < N + N; ++i) {
int network = htonl(i);
butil::IOBuf out;
out.append(&network, sizeof(network));
ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i;
}
out.clear();
out.append(&dummy, sizeof(dummy));
ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out));
hc.block = false;
std::pair<bool, int> p = std::make_pair(false, 0);
usleep(10);
brpc::StreamWait(request_stream, NULL, on_writable, &p);
while (!p.first) {
usleep(100);
}
ASSERT_EQ(0, p.second);
// wait flushing all the pending messages
while (handler._expected_next_value != N + N) {
usleep(100);
}
usleep(1000);
LOG(INFO) << "Starting block";
hc.block = true;
for (int i = N + N; i < N + N + N; ++i) {
int network = htonl(i);
butil::IOBuf out;
out.append(&network, sizeof(network));
ASSERT_EQ(0, brpc::StreamWrite(request_stream, out)) << "i=" << i - N - N;
}
out.clear();
out.append(&dummy, sizeof(dummy));
ASSERT_EQ(EAGAIN, brpc::StreamWrite(request_stream, out));
timespec duetime = butil::microseconds_from_now(1);
p.first = false;
LOG(INFO) << "Start wait";
brpc::StreamWait(request_stream, &duetime, on_writable, &p);
while (!p.first) {
usleep(100);
}
ASSERT_TRUE(p.first);
EXPECT_EQ(ETIMEDOUT, p.second);
hc.block = false;
ASSERT_EQ(0, brpc::StreamClose(request_stream));
while (!handler.stopped()) {
usleep(100);
}
ASSERT_FALSE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
ASSERT_EQ(N + N + N, handler._expected_next_value);
}
TEST_F(StreamingRpcTest, auto_close_if_host_socket_closed) {
HandlerControl hc;
OrderedInputHandler handler(&hc);
hc.block = true;
brpc::StreamOptions opt;
opt.handler = &handler;
const int N = 10000;
opt.max_buf_size = sizeof(uint32_t) * N;
brpc::Server server;
MyServiceWithStream service(opt);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
brpc::StreamOptions request_stream_options;
request_stream_options.max_buf_size = sizeof(uint32_t) * N;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
brpc::ScopedStream stream_guard(request_stream);
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
{
brpc::SocketUniquePtr ptr;
ASSERT_EQ(0, brpc::Socket::Address(request_stream, &ptr));
brpc::Stream* s = (brpc::Stream*)ptr->conn();
ASSERT_TRUE(s->_host_socket != NULL);
s->_host_socket->SetFailed();
}
usleep(100);
butil::IOBuf out;
out.append("test");
ASSERT_EQ(EINVAL, brpc::StreamWrite(request_stream, out));
while (!handler.stopped()) {
usleep(100);
}
ASSERT_FALSE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
ASSERT_EQ(0, handler._expected_next_value);
}
TEST_F(StreamingRpcTest, idle_timeout) {
HandlerControl hc;
OrderedInputHandler handler(&hc);
hc.block = true;
brpc::StreamOptions opt;
opt.handler = &handler;
opt.idle_timeout_ms = 2;
const int N = 10000;
opt.max_buf_size = sizeof(uint32_t) * N;
brpc::Server server;
MyServiceWithStream service(opt);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
brpc::StreamOptions request_stream_options;
request_stream_options.max_buf_size = sizeof(uint32_t) * N;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
brpc::ScopedStream stream_guard(request_stream);
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
usleep(10 * 1000 + 800);
ASSERT_EQ(0, brpc::StreamClose(request_stream));
while (!handler.stopped()) {
usleep(100);
}
ASSERT_FALSE(handler.failed());
// ASSERT_TRUE(handler.idle_times() >= 4 && handler.idle_times() <= 6)
// << handler.idle_times();
ASSERT_EQ(0, handler._expected_next_value);
}
class PingPongHandler : public brpc::StreamInputHandler {
public:
explicit PingPongHandler()
: _expected_next_value(0)
, _failed(false)
, _stopped(false)
, _idle_times(0)
{
}
int on_received_messages(brpc::StreamId id,
butil::IOBuf *const messages[],
size_t size) {
if (size != 1) {
_failed = true;
return 0;
}
for (size_t i = 0; i < size; ++i) {
CHECK(messages[i]->length() == sizeof(int));
int network = 0;
messages[i]->cutn(&network, sizeof(int));
if ((int)ntohl(network) != _expected_next_value) {
_failed = true;
}
int send_back = ntohl(network) + 1;
_expected_next_value = send_back + 1;
butil::IOBuf out;
network = htonl(send_back);
out.append(&network, sizeof(network));
// don't care the return value
brpc::StreamWrite(id, out);
}
return 0;
}
void on_idle_timeout(brpc::StreamId /*id*/) {
++_idle_times;
}
void on_closed(brpc::StreamId /*id*/) {
ASSERT_FALSE(_stopped);
_stopped = true;
}
bool failed() const { return _failed; }
bool stopped() const { return _stopped; }
int idle_times() const { return _idle_times; }
private:
int _expected_next_value;
bool _failed;
bool _stopped;
int _idle_times;
};
TEST_F(StreamingRpcTest, ping_pong) {
PingPongHandler resh;
brpc::StreamOptions opt;
opt.handler = &resh;
const int N = 10000;
opt.max_buf_size = sizeof(uint32_t) * N;
brpc::Server server;
MyServiceWithStream service(opt);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
brpc::Controller cntl;
brpc::StreamId request_stream;
brpc::StreamOptions request_stream_options;
PingPongHandler reqh;
reqh._expected_next_value = 1;
request_stream_options.handler = &reqh;
request_stream_options.max_buf_size = sizeof(uint32_t) * N;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
brpc::ScopedStream stream_guard(request_stream);
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
int send = 0;
butil::IOBuf out;
out.append(&send, sizeof(send));
ASSERT_EQ(0, brpc::StreamWrite(request_stream, out));
usleep(10 * 1000);
ASSERT_EQ(0, brpc::StreamClose(request_stream));
while (!resh.stopped() || !reqh.stopped()) {
usleep(100);
}
ASSERT_FALSE(resh.failed());
ASSERT_FALSE(reqh.failed());
ASSERT_EQ(0, resh.idle_times());
ASSERT_EQ(0, reqh.idle_times());
}
class SendNAfterAcceptStream : public AfterAcceptStream {
public:
explicit SendNAfterAcceptStream(int n)
: _n(n) {}
void action(brpc::StreamId s) {
for (int i = 0; i < _n; ++i) {
int network = htonl(i);
butil::IOBuf out;
out.append(&network, sizeof(network));
ASSERT_EQ(0, brpc::StreamWrite(s, out)) << "i=" << i;
}
}
private:
int _n;
};
TEST_F(StreamingRpcTest, server_send_data_before_run_done) {
const int N = 10000;
SendNAfterAcceptStream after_accept(N);
brpc::StreamOptions opt;
opt.max_buf_size = -1;
brpc::Server server;
MyServiceWithStream service(opt, &after_accept);
ASSERT_EQ(0, server.AddService(&service, brpc::SERVER_DOESNT_OWN_SERVICE));
ASSERT_EQ(0, server.Start(9007, NULL));
brpc::Channel channel;
ASSERT_EQ(0, channel.Init("127.0.0.1:9007", NULL));
OrderedInputHandler handler;
brpc::StreamOptions request_stream_options;
brpc::StreamId request_stream;
brpc::Controller cntl;
request_stream_options.handler = &handler;
ASSERT_EQ(0, StreamCreate(&request_stream, cntl, &request_stream_options));
brpc::ScopedStream stream_guard(request_stream);
test::EchoService_Stub stub(&channel);
stub.Echo(&cntl, &request, &response, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText() << " request_stream=" << request_stream;
// wait flushing all the pending messages
while (handler._expected_next_value != N) {
usleep(100);
}
ASSERT_EQ(0, brpc::StreamClose(request_stream));
while (!handler.stopped()) {
usleep(100);
}
ASSERT_FALSE(handler.failed());
ASSERT_EQ(0, handler.idle_times());
}