blob: b4e0547c493b99903a79ed0892b75387099cdbd1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// brpc - A framework to host and access services throughout Baidu.
// Date: Sun Jul 13 15:04:18 CST 2014
#include <cstddef>
#include <google/protobuf/stubs/logging.h>
#include <string>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <butil/build_config.h> // OS_MACOSX
#if defined(OS_MACOSX)
#include <sys/event.h>
#endif
#include <gtest/gtest.h>
#include <gflags/gflags.h>
#include <google/protobuf/text_format.h>
#include <unistd.h>
#include <butil/strings/string_number_conversions.h>
#include <brpc/policy/http_rpc_protocol.h>
#include <butil/base64.h>
#include "brpc/http_method.h"
#include "butil/iobuf.h"
#include "butil/logging.h"
#include "butil/files/scoped_file.h"
#include "butil/fd_guard.h"
#include "butil/file_util.h"
#include "brpc/socket.h"
#include "brpc/acceptor.h"
#include "brpc/server.h"
#include "brpc/channel.h"
#include "brpc/policy/most_common_message.h"
#include "echo.pb.h"
#include "brpc/policy/http_rpc_protocol.h"
#include "brpc/policy/http2_rpc_protocol.h"
#include "json2pb/pb_to_json.h"
#include "json2pb/json_to_pb.h"
#include "brpc/details/method_status.h"
#include "brpc/rpc_dump.h"
#include "bthread/unstable.h"
namespace brpc {
DECLARE_bool(rpc_dump);
DECLARE_string(rpc_dump_dir);
DECLARE_int32(rpc_dump_max_requests_in_one_file);
extern bvar::CollectorSpeedLimit g_rpc_dump_sl;
}
int main(int argc, char* argv[]) {
testing::InitGoogleTest(&argc, argv);
GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
if (GFLAGS_NS::SetCommandLineOption("socket_max_unwritten_bytes", "2000000").empty()) {
std::cerr << "Fail to set -socket_max_unwritten_bytes" << std::endl;
return -1;
}
if (GFLAGS_NS::SetCommandLineOption("crash_on_fatal_log", "true").empty()) {
std::cerr << "Fail to set -crash_on_fatal_log" << std::endl;
return -1;
}
return RUN_ALL_TESTS();
}
namespace {
static const std::string EXP_REQUEST = "hello";
static const std::string EXP_RESPONSE = "world";
static const std::string EXP_RESPONSE_CONTENT_LENGTH = "1024";
static const std::string EXP_RESPONSE_TRANSFER_ENCODING = "chunked";
static const std::string MOCK_CREDENTIAL = "mock credential";
static const std::string MOCK_USER = "mock user";
class MyAuthenticator : public brpc::Authenticator {
public:
MyAuthenticator() {}
int GenerateCredential(std::string* auth_str) const {
*auth_str = MOCK_CREDENTIAL;
return 0;
}
int VerifyCredential(const std::string& auth_str,
const butil::EndPoint&,
brpc::AuthContext* ctx) const {
EXPECT_EQ(MOCK_CREDENTIAL, auth_str);
ctx->set_user(MOCK_USER);
return 0;
}
};
class MyEchoService : public ::test::EchoService {
public:
void Echo(::google::protobuf::RpcController* cntl_base,
const ::test::EchoRequest* req,
::test::EchoResponse* res,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
const std::string* sleep_ms_str =
cntl->http_request().uri().GetQuery("sleep_ms");
if (sleep_ms_str) {
bthread_usleep(strtol(sleep_ms_str->data(), NULL, 10) * 1000);
}
res->set_message(EXP_RESPONSE);
}
};
class HttpTest : public ::testing::Test{
protected:
HttpTest() {
EXPECT_EQ(0, _server.AddBuiltinServices());
EXPECT_EQ(0, _server.AddService(
&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
// Hack: Regard `_server' as running
_server._status = brpc::Server::RUNNING;
_server._options.auth = &_auth;
EXPECT_EQ(0, pipe(_pipe_fds));
brpc::SocketId id;
brpc::SocketOptions options;
options.fd = _pipe_fds[1];
EXPECT_EQ(0, brpc::Socket::Create(options, &id));
EXPECT_EQ(0, brpc::Socket::Address(id, &_socket));
brpc::SocketOptions h2_client_options;
h2_client_options.user = brpc::get_client_side_messenger();
h2_client_options.fd = _pipe_fds[1];
EXPECT_EQ(0, brpc::Socket::Create(h2_client_options, &id));
EXPECT_EQ(0, brpc::Socket::Address(id, &_h2_client_sock));
};
virtual ~HttpTest() {};
virtual void SetUp() {};
virtual void TearDown() {};
void VerifyMessage(brpc::InputMessageBase* msg, bool expect) {
if (msg->_socket == NULL) {
_socket->ReAddress(&msg->_socket);
}
msg->_arg = &_server;
EXPECT_EQ(expect, brpc::policy::VerifyHttpRequest(msg));
}
void ProcessMessage(void (*process)(brpc::InputMessageBase*),
brpc::InputMessageBase* msg, bool set_eof) {
if (msg->_socket == NULL) {
_socket->ReAddress(&msg->_socket);
}
msg->_arg = &_server;
_socket->PostponeEOF();
if (set_eof) {
_socket->SetEOF();
}
(*process)(msg);
}
brpc::policy::HttpContext* MakePostRequestMessage(const std::string& path) {
brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
msg->header().uri().set_path(path);
msg->header().set_content_type("application/json");
msg->header().set_method(brpc::HTTP_METHOD_POST);
test::EchoRequest req;
req.set_message(EXP_REQUEST);
butil::IOBufAsZeroCopyOutputStream req_stream(&msg->body());
EXPECT_TRUE(json2pb::ProtoMessageToJson(req, &req_stream, NULL));
return msg;
}
brpc::policy::HttpContext* MakePostProtoTextRequestMessage(
const std::string& path) {
brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
msg->header().uri().set_path(path);
msg->header().set_content_type("application/proto-text");
msg->header().set_method(brpc::HTTP_METHOD_POST);
test::EchoRequest req;
req.set_message(EXP_REQUEST);
butil::IOBufAsZeroCopyOutputStream req_stream(&msg->body());
EXPECT_TRUE(google::protobuf::TextFormat::Print(req, &req_stream));
return msg;
}
brpc::policy::HttpContext* MakeGetRequestMessage(const std::string& path) {
brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
msg->header().uri().set_path(path);
msg->header().set_method(brpc::HTTP_METHOD_GET);
return msg;
}
brpc::policy::HttpContext* MakeResponseMessage(int code) {
brpc::policy::HttpContext* msg = new brpc::policy::HttpContext(false);
msg->header().set_status_code(code);
msg->header().set_content_type("application/json");
test::EchoResponse res;
res.set_message(EXP_RESPONSE);
butil::IOBufAsZeroCopyOutputStream res_stream(&msg->body());
EXPECT_TRUE(json2pb::ProtoMessageToJson(res, &res_stream, NULL));
return msg;
}
void CheckResponseCode(bool expect_empty, int expect_code) {
int bytes_in_pipe = 0;
ioctl(_pipe_fds[0], FIONREAD, &bytes_in_pipe);
if (expect_empty) {
EXPECT_EQ(0, bytes_in_pipe);
return;
}
EXPECT_GT(bytes_in_pipe, 0);
butil::IOPortal buf;
EXPECT_EQ((ssize_t)bytes_in_pipe,
buf.append_from_file_descriptor(_pipe_fds[0], 1024));
brpc::ParseResult pr =
brpc::policy::ParseHttpMessage(&buf, _socket.get(), false, NULL);
EXPECT_EQ(brpc::PARSE_OK, pr.error());
brpc::policy::HttpContext* msg =
static_cast<brpc::policy::HttpContext*>(pr.message());
EXPECT_EQ(expect_code, msg->header().status_code());
msg->Destroy();
}
void MakeH2EchoRequestBuf(butil::IOBuf* out, brpc::Controller* cntl, int* h2_stream_id) {
butil::IOBuf request_buf;
test::EchoRequest req;
req.set_message(EXP_REQUEST);
cntl->http_request().set_method(brpc::HTTP_METHOD_POST);
brpc::policy::SerializeHttpRequest(&request_buf, cntl, &req);
ASSERT_FALSE(cntl->Failed());
brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(cntl);
cntl->_current_call.stream_user_data = h2_req;
brpc::SocketMessage* socket_message = NULL;
brpc::policy::PackH2Request(NULL, &socket_message, cntl->call_id().value,
NULL, cntl, request_buf, NULL);
butil::Status st = socket_message->AppendAndDestroySelf(out, _h2_client_sock.get());
ASSERT_TRUE(st.ok());
*h2_stream_id = h2_req->_stream_id;
}
void MakeH2EchoResponseBuf(butil::IOBuf* out, int h2_stream_id) {
brpc::Controller cntl;
test::EchoResponse res;
res.set_message(EXP_RESPONSE);
cntl.http_request().set_content_type("application/proto");
{
butil::IOBufAsZeroCopyOutputStream wrapper(&cntl.response_attachment());
EXPECT_TRUE(res.SerializeToZeroCopyStream(&wrapper));
}
brpc::policy::H2UnsentResponse* h2_res = brpc::policy::H2UnsentResponse::New(&cntl, h2_stream_id, false /*is grpc*/);
butil::Status st = h2_res->AppendAndDestroySelf(out, _h2_client_sock.get());
ASSERT_TRUE(st.ok());
}
int _pipe_fds[2];
brpc::SocketUniquePtr _socket;
brpc::SocketUniquePtr _h2_client_sock;
brpc::Server _server;
MyEchoService _svc;
MyAuthenticator _auth;
};
TEST_F(HttpTest, indenting_ostream) {
std::ostringstream os1;
brpc::IndentingOStream is1(os1, 2);
brpc::IndentingOStream is2(is1, 2);
os1 << "begin1\nhello" << std::endl << "world\nend1" << std::endl;
is1 << "begin2\nhello" << std::endl << "world\nend2" << std::endl;
is2 << "begin3\nhello" << std::endl << "world\nend3" << std::endl;
ASSERT_EQ(
"begin1\nhello\nworld\nend1\nbegin2\n hello\n world\n end2\n"
" begin3\n hello\n world\n end3\n",
os1.str());
}
TEST_F(HttpTest, parse_http_address) {
const std::string EXP_HOSTNAME = "www.baidu.com:9876";
butil::EndPoint EXP_ENDPOINT;
{
std::string url = "https://" + EXP_HOSTNAME;
EXPECT_TRUE(brpc::policy::ParseHttpServerAddress(&EXP_ENDPOINT, url.c_str()));
}
{
butil::EndPoint ep;
std::string url = "http://" +
std::string(endpoint2str(EXP_ENDPOINT).c_str());
EXPECT_TRUE(brpc::policy::ParseHttpServerAddress(&ep, url.c_str()));
EXPECT_EQ(EXP_ENDPOINT, ep);
}
{
butil::EndPoint ep;
std::string url = "https://" +
std::string(butil::ip2str(EXP_ENDPOINT.ip).c_str());
EXPECT_TRUE(brpc::policy::ParseHttpServerAddress(&ep, url.c_str()));
EXPECT_EQ(EXP_ENDPOINT.ip, ep.ip);
EXPECT_EQ(443, ep.port);
}
{
butil::EndPoint ep;
EXPECT_FALSE(brpc::policy::ParseHttpServerAddress(&ep, "invalid_url"));
}
{
butil::EndPoint ep;
EXPECT_FALSE(brpc::policy::ParseHttpServerAddress(
&ep, "https://no.such.machine:9090"));
}
}
TEST_F(HttpTest, verify_request) {
{
brpc::policy::HttpContext* msg =
MakePostRequestMessage("/EchoService/Echo");
VerifyMessage(msg, false);
msg->Destroy();
}
{
brpc::policy::HttpContext* msg = MakeGetRequestMessage("/status");
VerifyMessage(msg, true);
msg->Destroy();
}
{
brpc::policy::HttpContext* msg =
MakePostRequestMessage("/EchoService/Echo");
_socket->SetFailed();
VerifyMessage(msg, false);
msg->Destroy();
}
{
brpc::policy::HttpContext* msg =
MakePostProtoTextRequestMessage("/EchoService/Echo");
VerifyMessage(msg, false);
msg->Destroy();
}
}
TEST_F(HttpTest, process_request_failed_socket) {
brpc::policy::HttpContext* msg = MakePostRequestMessage("/EchoService/Echo");
_socket->SetFailed();
ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
CheckResponseCode(true, 0);
}
TEST_F(HttpTest, reject_get_to_pb_services_with_required_fields) {
brpc::policy::HttpContext* msg = MakeGetRequestMessage("/EchoService/Echo");
_server._status = brpc::Server::RUNNING;
ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
ASSERT_EQ(0ll, _server._nerror_bvar.get_value());
const brpc::Server::MethodProperty* mp =
_server.FindMethodPropertyByFullName("test.EchoService.Echo");
ASSERT_TRUE(mp);
ASSERT_TRUE(mp->status);
ASSERT_EQ(1ll, mp->status->_nerror_bvar.get_value());
CheckResponseCode(false, brpc::HTTP_STATUS_BAD_REQUEST);
}
TEST_F(HttpTest, process_request_logoff) {
brpc::policy::HttpContext* msg = MakePostRequestMessage("/EchoService/Echo");
_server._status = brpc::Server::READY;
ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::HTTP_STATUS_SERVICE_UNAVAILABLE);
}
TEST_F(HttpTest, process_request_wrong_method) {
brpc::policy::HttpContext* msg = MakePostRequestMessage("/NO_SUCH_METHOD");
ProcessMessage(brpc::policy::ProcessHttpRequest, msg, false);
ASSERT_EQ(1ll, _server._nerror_bvar.get_value());
CheckResponseCode(false, brpc::HTTP_STATUS_NOT_FOUND);
}
TEST_F(HttpTest, process_response_after_eof) {
test::EchoResponse res;
brpc::Controller cntl;
cntl._response = &res;
brpc::policy::HttpContext* msg =
MakeResponseMessage(brpc::HTTP_STATUS_OK);
_socket->set_correlation_id(cntl.call_id().value);
ProcessMessage(brpc::policy::ProcessHttpResponse, msg, true);
ASSERT_EQ(EXP_RESPONSE, res.message());
ASSERT_TRUE(_socket->Failed());
}
TEST_F(HttpTest, process_response_error_code) {
{
brpc::Controller cntl;
_socket->set_correlation_id(cntl.call_id().value);
brpc::policy::HttpContext* msg =
MakeResponseMessage(brpc::HTTP_STATUS_CONTINUE);
ProcessMessage(brpc::policy::ProcessHttpResponse, msg, false);
ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
ASSERT_EQ(brpc::HTTP_STATUS_CONTINUE, cntl.http_response().status_code());
}
{
brpc::Controller cntl;
_socket->set_correlation_id(cntl.call_id().value);
brpc::policy::HttpContext* msg =
MakeResponseMessage(brpc::HTTP_STATUS_TEMPORARY_REDIRECT);
ProcessMessage(brpc::policy::ProcessHttpResponse, msg, false);
ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
ASSERT_EQ(brpc::HTTP_STATUS_TEMPORARY_REDIRECT,
cntl.http_response().status_code());
}
{
brpc::Controller cntl;
_socket->set_correlation_id(cntl.call_id().value);
brpc::policy::HttpContext* msg =
MakeResponseMessage(brpc::HTTP_STATUS_BAD_REQUEST);
ProcessMessage(brpc::policy::ProcessHttpResponse, msg, false);
ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
ASSERT_EQ(brpc::HTTP_STATUS_BAD_REQUEST,
cntl.http_response().status_code());
}
{
brpc::Controller cntl;
_socket->set_correlation_id(cntl.call_id().value);
brpc::policy::HttpContext* msg = MakeResponseMessage(12345);
ProcessMessage(brpc::policy::ProcessHttpResponse, msg, false);
ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
ASSERT_EQ(12345, cntl.http_response().status_code());
}
}
TEST_F(HttpTest, complete_flow) {
butil::IOBuf request_buf;
butil::IOBuf total_buf;
brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
cntl._response = &res;
cntl._connection_type = brpc::CONNECTION_TYPE_SHORT;
cntl._method = test::EchoService::descriptor()->method(0);
ASSERT_EQ(0, brpc::Socket::Address(_socket->id(), &cntl._current_call.sending_sock));
// Send request
req.set_message(EXP_REQUEST);
brpc::policy::SerializeHttpRequest(&request_buf, &cntl, &req);
ASSERT_FALSE(cntl.Failed());
brpc::policy::PackHttpRequest(
&total_buf, NULL, cntl.call_id().value,
cntl._method, &cntl, request_buf, &_auth);
ASSERT_FALSE(cntl.Failed());
// Verify and handle request
brpc::ParseResult req_pr =
brpc::policy::ParseHttpMessage(&total_buf, _socket.get(), false, NULL);
ASSERT_EQ(brpc::PARSE_OK, req_pr.error());
brpc::InputMessageBase* req_msg = req_pr.message();
VerifyMessage(req_msg, true);
ProcessMessage(brpc::policy::ProcessHttpRequest, req_msg, false);
// Read response from pipe
butil::IOPortal response_buf;
response_buf.append_from_file_descriptor(_pipe_fds[0], 1024);
brpc::ParseResult res_pr =
brpc::policy::ParseHttpMessage(&response_buf, _socket.get(), false, NULL);
ASSERT_EQ(brpc::PARSE_OK, res_pr.error());
brpc::InputMessageBase* res_msg = res_pr.message();
ProcessMessage(brpc::policy::ProcessHttpResponse, res_msg, false);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_EQ(EXP_RESPONSE, res.message());
}
TEST_F(HttpTest, chunked_uploading) {
const int port = 8923;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
// Send request via curl using chunked encoding
const std::string req = "{\"message\":\"hello\"}";
const std::string res_fname = "curl.out";
std::string cmd;
butil::string_printf(&cmd, "curl -X POST -d '%s' -H 'Transfer-Encoding:chunked' "
"-H 'Content-Type:application/json' -o %s "
"http://localhost:%d/EchoService/Echo",
req.c_str(), res_fname.c_str(), port);
ASSERT_EQ(0, system(cmd.c_str()));
// Check response
const std::string exp_res = "{\"message\":\"world\"}";
butil::ScopedFILE fp(res_fname.c_str(), "r");
char buf[128];
ASSERT_TRUE(fgets(buf, sizeof(buf), fp));
EXPECT_EQ(exp_res, std::string(buf));
}
enum DonePlace {
DONE_BEFORE_CREATE_PA = 0,
DONE_AFTER_CREATE_PA_BEFORE_DESTROY_PA,
DONE_AFTER_DESTROY_PA,
};
// For writing into PA.
const char PA_DATA[] = "abcdefghijklmnopqrstuvwxyz1234567890!@#$%^&*()_=-+";
const size_t PA_DATA_LEN = sizeof(PA_DATA) - 1/*not count the ending zero*/;
static void CopyPAPrefixedWithSeqNo(char* buf, uint64_t seq_no) {
memcpy(buf, PA_DATA, PA_DATA_LEN);
*(uint64_t*)buf = seq_no;
}
class DownloadServiceImpl : public ::test::DownloadService {
public:
DownloadServiceImpl(DonePlace done_place = DONE_BEFORE_CREATE_PA,
size_t num_repeat = 1)
: _done_place(done_place)
, _nrep(num_repeat)
, _nwritten(0)
, _ever_full(false)
, _last_errno(0) {}
void Download(::google::protobuf::RpcController* cntl_base,
const ::test::HttpRequest*,
::test::HttpResponse*,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
cntl->http_response().set_content_type("text/plain");
brpc::StopStyle stop_style = (_nrep == std::numeric_limits<size_t>::max()
? brpc::FORCE_STOP : brpc::WAIT_FOR_STOP);
butil::intrusive_ptr<brpc::ProgressiveAttachment> pa
= cntl->CreateProgressiveAttachment(stop_style);
if (pa == NULL) {
cntl->SetFailed("The socket was just failed");
return;
}
if (_done_place == DONE_BEFORE_CREATE_PA) {
done_guard.reset(NULL);
}
ASSERT_GT(PA_DATA_LEN, 8u); // long enough to hold a 64-bit decimal.
char buf[PA_DATA_LEN];
for (size_t c = 0; c < _nrep;) {
CopyPAPrefixedWithSeqNo(buf, c);
if (pa->Write(buf, sizeof(buf)) != 0) {
if (errno == brpc::EOVERCROWDED) {
LOG_EVERY_SECOND(INFO) << "full pa=" << pa.get();
_ever_full = true;
bthread_usleep(10000);
continue;
} else {
_last_errno = errno;
break;
}
} else {
_nwritten += PA_DATA_LEN;
}
++c;
}
if (_done_place == DONE_AFTER_CREATE_PA_BEFORE_DESTROY_PA) {
done_guard.reset(NULL);
}
LOG(INFO) << "Destroy pa=" << pa.get();
pa.reset(NULL);
if (_done_place == DONE_AFTER_DESTROY_PA) {
done_guard.reset(NULL);
}
}
void DownloadFailed(::google::protobuf::RpcController* cntl_base,
const ::test::HttpRequest*,
::test::HttpResponse*,
::google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl =
static_cast<brpc::Controller*>(cntl_base);
cntl->http_response().set_content_type("text/plain");
brpc::StopStyle stop_style = (_nrep == std::numeric_limits<size_t>::max()
? brpc::FORCE_STOP : brpc::WAIT_FOR_STOP);
butil::intrusive_ptr<brpc::ProgressiveAttachment> pa
= cntl->CreateProgressiveAttachment(stop_style);
if (pa == NULL) {
cntl->SetFailed("The socket was just failed");
return;
}
char buf[PA_DATA_LEN];
while (true) {
if (pa->Write(buf, sizeof(buf)) != 0) {
if (errno == brpc::EOVERCROWDED) {
LOG_EVERY_SECOND(INFO) << "full pa=" << pa.get();
bthread_usleep(10000);
continue;
} else {
_last_errno = errno;
break;
}
}
break;
}
// The remote client will not receive the data written to the
// progressive attachment when the controller failed.
cntl->SetFailed("Intentionally set controller failed");
done_guard.reset(NULL);
// Return value of Write after controller has failed should
// be less than zero.
CHECK_LT(pa->Write(buf, sizeof(buf)), 0);
CHECK_EQ(errno, ECANCELED);
}
void set_done_place(DonePlace done_place) { _done_place = done_place; }
size_t written_bytes() const { return _nwritten; }
bool ever_full() const { return _ever_full; }
int last_errno() const { return _last_errno; }
private:
DonePlace _done_place;
size_t _nrep;
size_t _nwritten;
bool _ever_full;
int _last_errno;
};
TEST_F(HttpTest, read_chunked_response_normally) {
const int port = 8923;
brpc::Server server;
DownloadServiceImpl svc;
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
for (int i = 0; i < 3; ++i) {
svc.set_done_place((DonePlace)i);
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
brpc::Controller cntl;
cntl.http_request().uri() = "/DownloadService/Download";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
std::string expected(PA_DATA_LEN, 0);
CopyPAPrefixedWithSeqNo(&expected[0], 0);
ASSERT_EQ(expected, cntl.response_attachment());
}
}
TEST_F(HttpTest, read_failed_chunked_response) {
const int port = 8923;
brpc::Server server;
DownloadServiceImpl svc;
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
brpc::Controller cntl;
cntl.http_request().uri() = "/DownloadService/DownloadFailed";
cntl.response_will_be_read_progressively();
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
EXPECT_TRUE(cntl.response_attachment().empty());
ASSERT_TRUE(cntl.Failed());
ASSERT_NE(cntl.ErrorText().find("HTTP/1.1 500 Internal Server Error"),
std::string::npos) << cntl.ErrorText();
ASSERT_NE(cntl.ErrorText().find("Intentionally set controller failed"),
std::string::npos) << cntl.ErrorText();
ASSERT_EQ(0, svc.last_errno());
}
class ReadBody : public brpc::ProgressiveReader,
public brpc::SharedObject {
public:
ReadBody()
: _nread(0)
, _ncount(0)
, _destroyed(false) {
butil::intrusive_ptr<ReadBody>(this).detach(); // ref
}
butil::Status OnReadOnePart(const void* data, size_t length) {
_nread += length;
while (length > 0) {
size_t nappend = std::min(_buf.size() + length, PA_DATA_LEN) - _buf.size();
_buf.append((const char*)data, nappend);
data = (const char*)data + nappend;
length -= nappend;
if (_buf.size() >= PA_DATA_LEN) {
EXPECT_EQ(PA_DATA_LEN, _buf.size());
char expected[PA_DATA_LEN];
CopyPAPrefixedWithSeqNo(expected, _ncount++);
EXPECT_EQ(0, memcmp(expected, _buf.data(), PA_DATA_LEN))
<< "ncount=" << _ncount;
_buf.clear();
}
}
return butil::Status::OK();
}
void OnEndOfMessage(const butil::Status& st) {
butil::intrusive_ptr<ReadBody>(this, false); // deref
ASSERT_LT(_buf.size(), PA_DATA_LEN);
ASSERT_EQ(0, memcmp(_buf.data(), PA_DATA, _buf.size()));
_destroyed = true;
_destroying_st = st;
LOG(INFO) << "Destroy ReadBody=" << this << ", " << st;
}
bool destroyed() const { return _destroyed; }
const butil::Status& destroying_status() const { return _destroying_st; }
size_t read_bytes() const { return _nread; }
private:
std::string _buf;
size_t _nread;
size_t _ncount;
bool _destroyed;
butil::Status _destroying_st;
};
static const int GENERAL_DELAY_US = 300000; // 0.3s
TEST_F(HttpTest, read_long_body_progressively) {
butil::intrusive_ptr<ReadBody> reader;
{
const int port = 8923;
brpc::Server server;
DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
std::numeric_limits<size_t>::max());
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
{
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
cntl.response_will_be_read_progressively();
cntl.http_request().uri() = "/DownloadService/Download";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(cntl.response_attachment().empty());
reader.reset(new ReadBody);
cntl.ReadProgressiveAttachmentBy(reader.get());
size_t last_read = 0;
for (size_t i = 0; i < 3; ++i) {
sleep(1);
size_t current_read = reader->read_bytes();
LOG(INFO) << "read=" << current_read - last_read
<< " total=" << current_read;
last_read = current_read;
}
// Read something in past N seconds.
ASSERT_GT(last_read, (size_t)100000);
}
// the socket still holds a ref.
ASSERT_FALSE(reader->destroyed());
}
// Wait for recycling of the main socket.
usleep(GENERAL_DELAY_US);
// even if the main socket is recycled, the pooled socket for
// receiving data is not affected.
ASSERT_FALSE(reader->destroyed());
}
// Wait for close of the connection due to server's stopping.
usleep(GENERAL_DELAY_US);
ASSERT_TRUE(reader->destroyed());
ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
}
TEST_F(HttpTest, read_short_body_progressively) {
butil::intrusive_ptr<ReadBody> reader;
const int port = 8923;
brpc::Server server;
const int NREP = 10000;
DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA, NREP);
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
{
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
cntl.response_will_be_read_progressively();
cntl.http_request().uri() = "/DownloadService/Download";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(cntl.response_attachment().empty());
reader.reset(new ReadBody);
cntl.ReadProgressiveAttachmentBy(reader.get());
size_t last_read = 0;
for (size_t i = 0; i < 3; ++i) {
sleep(1);
size_t current_read = reader->read_bytes();
LOG(INFO) << "read=" << current_read - last_read
<< " total=" << current_read;
last_read = current_read;
}
ASSERT_EQ(NREP * PA_DATA_LEN, svc.written_bytes());
ASSERT_EQ(NREP * PA_DATA_LEN, last_read);
}
ASSERT_TRUE(reader->destroyed());
ASSERT_EQ(0, reader->destroying_status().error_code());
}
}
TEST_F(HttpTest, read_progressively_after_cntl_destroys) {
butil::intrusive_ptr<ReadBody> reader;
{
const int port = 8923;
brpc::Server server;
DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
std::numeric_limits<size_t>::max());
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
{
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
cntl.response_will_be_read_progressively();
cntl.http_request().uri() = "/DownloadService/Download";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(cntl.response_attachment().empty());
reader.reset(new ReadBody);
cntl.ReadProgressiveAttachmentBy(reader.get());
}
size_t last_read = 0;
for (size_t i = 0; i < 3; ++i) {
sleep(1);
size_t current_read = reader->read_bytes();
LOG(INFO) << "read=" << current_read - last_read
<< " total=" << current_read;
last_read = current_read;
}
// Read something in past N seconds.
ASSERT_GT(last_read, (size_t)100000);
ASSERT_FALSE(reader->destroyed());
}
// Wait for recycling of the main socket.
usleep(GENERAL_DELAY_US);
ASSERT_FALSE(reader->destroyed());
}
// Wait for close of the connection due to server's stopping.
usleep(GENERAL_DELAY_US);
ASSERT_TRUE(reader->destroyed());
ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
}
TEST_F(HttpTest, read_progressively_after_long_delay) {
butil::intrusive_ptr<ReadBody> reader;
{
const int port = 8923;
brpc::Server server;
DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
std::numeric_limits<size_t>::max());
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
{
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
cntl.response_will_be_read_progressively();
cntl.http_request().uri() = "/DownloadService/Download";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(cntl.response_attachment().empty());
LOG(INFO) << "Sleep 3 seconds to make PA at server-side full";
sleep(3);
EXPECT_TRUE(svc.ever_full());
ASSERT_EQ(0, svc.last_errno());
reader.reset(new ReadBody);
cntl.ReadProgressiveAttachmentBy(reader.get());
size_t last_read = 0;
for (size_t i = 0; i < 3; ++i) {
sleep(1);
size_t current_read = reader->read_bytes();
LOG(INFO) << "read=" << current_read - last_read
<< " total=" << current_read;
last_read = current_read;
}
// Read something in past N seconds.
ASSERT_GT(last_read, (size_t)100000);
}
ASSERT_FALSE(reader->destroyed());
}
// Wait for recycling of the main socket.
usleep(GENERAL_DELAY_US);
ASSERT_FALSE(reader->destroyed());
}
// Wait for close of the connection due to server's stopping.
usleep(GENERAL_DELAY_US);
ASSERT_TRUE(reader->destroyed());
ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
}
TEST_F(HttpTest, skip_progressive_reading) {
const int port = 8923;
brpc::Server server;
DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
std::numeric_limits<size_t>::max());
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
cntl.response_will_be_read_progressively();
cntl.http_request().uri() = "/DownloadService/Download";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(cntl.response_attachment().empty());
}
const size_t old_written_bytes = svc.written_bytes();
LOG(INFO) << "Sleep 3 seconds after destroy of Controller";
sleep(3);
const size_t new_written_bytes = svc.written_bytes();
ASSERT_EQ(0, svc.last_errno());
LOG(INFO) << "Server still wrote " << new_written_bytes - old_written_bytes;
// The server side still wrote things.
ASSERT_GT(new_written_bytes - old_written_bytes, (size_t)100000);
}
class AlwaysFailRead : public brpc::ProgressiveReader {
public:
// @ProgressiveReader
butil::Status OnReadOnePart(const void* /*data*/, size_t /*length*/) {
return butil::Status(-1, "intended fail at %s:%d", __FILE__, __LINE__);
}
void OnEndOfMessage(const butil::Status& st) {
LOG(INFO) << "Destroy " << this << ": " << st;
delete this;
}
};
TEST_F(HttpTest, failed_on_read_one_part) {
const int port = 8923;
brpc::Server server;
DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
std::numeric_limits<size_t>::max());
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
cntl.response_will_be_read_progressively();
cntl.http_request().uri() = "/DownloadService/Download";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(cntl.response_attachment().empty());
cntl.ReadProgressiveAttachmentBy(new AlwaysFailRead);
}
LOG(INFO) << "Sleep 1 second";
sleep(1);
ASSERT_NE(0, svc.last_errno());
}
TEST_F(HttpTest, broken_socket_stops_progressive_reading) {
butil::intrusive_ptr<ReadBody> reader;
const int port = 8923;
brpc::Server server;
DownloadServiceImpl svc(DONE_BEFORE_CREATE_PA,
std::numeric_limits<size_t>::max());
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
{
brpc::Controller cntl;
cntl.response_will_be_read_progressively();
cntl.http_request().uri() = "/DownloadService/Download";
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
ASSERT_TRUE(cntl.response_attachment().empty());
reader.reset(new ReadBody);
cntl.ReadProgressiveAttachmentBy(reader.get());
size_t last_read = 0;
for (size_t i = 0; i < 3; ++i) {
sleep(1);
size_t current_read = reader->read_bytes();
LOG(INFO) << "read=" << current_read - last_read
<< " total=" << current_read;
last_read = current_read;
}
// Read something in past N seconds.
ASSERT_GT(last_read, (size_t)100000);
}
// the socket still holds a ref.
ASSERT_FALSE(reader->destroyed());
LOG(INFO) << "Stopping the server";
server.Stop(0);
server.Join();
// Wait for error reporting from the socket.
usleep(GENERAL_DELAY_US);
ASSERT_TRUE(reader->destroyed());
ASSERT_EQ(ECONNRESET, reader->destroying_status().error_code());
}
static const std::string TEST_PROGRESSIVE_HEADER = "Progressive";
static const std::string TEST_PROGRESSIVE_HEADER_VAL = "Progressive-val";
class ServerProgressiveReader : public ReadBody {
public:
ServerProgressiveReader(brpc::Controller* cntl, google::protobuf::Closure* done)
: _cntl(cntl)
, _done(done) {}
// @ProgressiveReader
void OnEndOfMessage(const butil::Status& st) {
butil::intrusive_ptr<ReadBody>(this);
brpc::ClosureGuard done_guard(_done);
ASSERT_LT(_buf.size(), PA_DATA_LEN);
ASSERT_EQ(0, memcmp(_buf.data(), PA_DATA, _buf.size()));
_destroyed = true;
_destroying_st = st;
LOG(INFO) << "Destroy ReadBody=" << this << ", " << st;
_cntl->response_attachment().append("Sucess");
}
private:
brpc::Controller* _cntl;
google::protobuf::Closure* _done;
};
class ServerAlwaysFailReader : public brpc::ProgressiveReader {
public:
ServerAlwaysFailReader(brpc::Controller* cntl, google::protobuf::Closure* done)
: _cntl(cntl)
, _done(done) {}
// @ProgressiveReader
butil::Status OnReadOnePart(const void* /*data*/, size_t /*length*/) {
return butil::Status(-1, "intended fail at %s:%d", __FILE__, __LINE__);
}
void OnEndOfMessage(const butil::Status& st) {
brpc::ClosureGuard done_guard(_done);
CHECK_EQ(-1, st.error_code());
_cntl->SetFailed("Must Failed");
LOG(INFO) << "Destroy " << this << ": " << st;
delete this;
}
private:
brpc::Controller* _cntl;
google::protobuf::Closure* _done;
};
class UploadServiceImpl : public ::test::UploadService {
public:
void Upload(::google::protobuf::RpcController* controller,
const ::test::HttpRequest* request,
::test::HttpResponse* response,
::google::protobuf::Closure* done) {
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
check_header(cntl);
cntl->request_will_be_read_progressively();
cntl->ReadProgressiveAttachmentBy(new ServerProgressiveReader(cntl, done));
}
void UploadFailed(::google::protobuf::RpcController* controller,
const ::test::HttpRequest* request,
::test::HttpResponse* response,
::google::protobuf::Closure* done) {
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
check_header(cntl);
cntl->request_will_be_read_progressively();
cntl->ReadProgressiveAttachmentBy(new ServerAlwaysFailReader(cntl, done));
}
private:
void check_header(brpc::Controller* cntl) {
const std::string* test_header = cntl->http_request().GetHeader(TEST_PROGRESSIVE_HEADER);
GOOGLE_CHECK_NOTNULL(test_header);
CHECK_EQ(*test_header, TEST_PROGRESSIVE_HEADER_VAL);
}
};
TEST_F(HttpTest, server_end_read_short_body_progressively) {
const int port = 8923;
brpc::ServiceOptions opt;
opt.enable_progressive_read = true;
opt.ownership = brpc::SERVER_DOESNT_OWN_SERVICE;
UploadServiceImpl upsvc;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&upsvc, opt));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
brpc::Controller cntl;
cntl.http_request().uri() = "/UploadService/Upload";
cntl.http_request().SetHeader(TEST_PROGRESSIVE_HEADER, TEST_PROGRESSIVE_HEADER_VAL);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
ASSERT_GT(PA_DATA_LEN, 8u); // long enough to hold a 64-bit decimal.
char buf[PA_DATA_LEN];
for (size_t c = 0; c < 10000;) {
CopyPAPrefixedWithSeqNo(buf, c);
if (cntl.request_attachment().append(buf, sizeof(buf)) != 0) {
if (errno == brpc::EOVERCROWDED) {
LOG(INFO) << "full msg=" << cntl.request_attachment().to_string();
} else {
LOG(INFO) << "Error:" << errno;
}
break;
}
++c;
}
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed());
}
TEST_F(HttpTest, server_end_read_failed) {
const int port = 8923;
brpc::ServiceOptions opt;
opt.enable_progressive_read = true;
opt.ownership = brpc::SERVER_DOESNT_OWN_SERVICE;
UploadServiceImpl upsvc;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&upsvc, opt));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
brpc::Controller cntl;
cntl.http_request().uri() = "/UploadService/UploadFailed";
cntl.http_request().SetHeader(TEST_PROGRESSIVE_HEADER, TEST_PROGRESSIVE_HEADER_VAL);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
ASSERT_GT(PA_DATA_LEN, 8u); // long enough to hold a 64-bit decimal.
char buf[PA_DATA_LEN];
for (size_t c = 0; c < 10;) {
CopyPAPrefixedWithSeqNo(buf, c);
if (cntl.request_attachment().append(buf, sizeof(buf)) != 0) {
if (errno == brpc::EOVERCROWDED) {
LOG(INFO) << "full msg=" << cntl.request_attachment().to_string();
} else {
LOG(INFO) << "Error:" << errno;
}
break;
}
++c;
}
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_TRUE(cntl.Failed());
}
TEST_F(HttpTest, http2_sanity) {
const int port = 8923;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "h2";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
// Check that the first request with size larger than the default window can
// be sent out, when remote settings are not received.
brpc::Controller cntl;
test::EchoRequest big_req;
test::EchoResponse res;
std::string message(2 * 1024 * 1024 /* 2M */, 'x');
big_req.set_message(message);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
channel.CallMethod(NULL, &cntl, &big_req, &res, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
// socket replacement when streamId runs out, the initial streamId is a special
// value set in ctor of H2Context so that the number 15000 is enough to run out
// of stream.
test::EchoRequest req;
req.set_message(EXP_REQUEST);
for (int i = 0; i < 15000; ++i) {
brpc::Controller cntl;
cntl.http_request().set_content_type("application/json");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
}
// check connection window size
brpc::SocketUniquePtr main_ptr;
brpc::SocketUniquePtr agent_ptr;
EXPECT_EQ(brpc::Socket::Address(channel._server_id, &main_ptr), 0);
EXPECT_EQ(main_ptr->GetAgentSocket(&agent_ptr, NULL), 0);
brpc::policy::H2Context* ctx = static_cast<brpc::policy::H2Context*>(agent_ptr->parsing_context());
ASSERT_GT(ctx->_remote_window_left.load(butil::memory_order_relaxed),
brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / 2);
}
TEST_F(HttpTest, http2_ping) {
// This test injects PING frames before and after header and data.
brpc::Controller cntl;
// Prepare request
butil::IOBuf req_out;
int h2_stream_id = 0;
MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
// Prepare response
butil::IOBuf res_out;
char pingbuf[9 /*FRAME_HEAD_SIZE*/ + 8 /*Opaque Data*/];
brpc::policy::SerializeFrameHead(pingbuf, 8, brpc::policy::H2_FRAME_PING, 0, 0);
// insert ping before header and data
res_out.append(pingbuf, sizeof(pingbuf));
MakeH2EchoResponseBuf(&res_out, h2_stream_id);
// insert ping after header and data
res_out.append(pingbuf, sizeof(pingbuf));
// parse response
brpc::ParseResult res_pr =
brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_TRUE(res_pr.is_ok());
// process response
ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
ASSERT_FALSE(cntl.Failed());
}
inline void SaveUint32(void* out, uint32_t v) {
uint8_t* p = (uint8_t*)out;
p[0] = (v >> 24) & 0xFF;
p[1] = (v >> 16) & 0xFF;
p[2] = (v >> 8) & 0xFF;
p[3] = v & 0xFF;
}
TEST_F(HttpTest, http2_rst_before_header) {
brpc::Controller cntl;
// Prepare request
butil::IOBuf req_out;
int h2_stream_id = 0;
MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
// Prepare response
butil::IOBuf res_out;
char rstbuf[9 /*FRAME_HEAD_SIZE*/ + 4];
brpc::policy::SerializeFrameHead(rstbuf, 4, brpc::policy::H2_FRAME_RST_STREAM, 0, h2_stream_id);
SaveUint32(rstbuf + 9, brpc::H2_INTERNAL_ERROR);
res_out.append(rstbuf, sizeof(rstbuf));
MakeH2EchoResponseBuf(&res_out, h2_stream_id);
// parse response
brpc::ParseResult res_pr =
brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_TRUE(res_pr.is_ok());
// process response
ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
ASSERT_TRUE(cntl.Failed());
ASSERT_TRUE(cntl.ErrorCode() == brpc::EHTTP);
ASSERT_TRUE(cntl.http_response().status_code() == brpc::HTTP_STATUS_INTERNAL_SERVER_ERROR);
}
TEST_F(HttpTest, http2_rst_after_header_and_data) {
brpc::Controller cntl;
// Prepare request
butil::IOBuf req_out;
int h2_stream_id = 0;
MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
// Prepare response
butil::IOBuf res_out;
MakeH2EchoResponseBuf(&res_out, h2_stream_id);
char rstbuf[9 /*FRAME_HEAD_SIZE*/ + 4];
brpc::policy::SerializeFrameHead(rstbuf, 4, brpc::policy::H2_FRAME_RST_STREAM, 0, h2_stream_id);
SaveUint32(rstbuf + 9, brpc::H2_INTERNAL_ERROR);
res_out.append(rstbuf, sizeof(rstbuf));
// parse response
brpc::ParseResult res_pr =
brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_TRUE(res_pr.is_ok());
// process response
ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
ASSERT_FALSE(cntl.Failed());
ASSERT_TRUE(cntl.http_response().status_code() == brpc::HTTP_STATUS_OK);
}
TEST_F(HttpTest, http2_window_used_up) {
brpc::Controller cntl;
butil::IOBuf request_buf;
test::EchoRequest req;
// longer message to trigger using up window size sooner
req.set_message("FLOW_CONTROL_FLOW_CONTROL");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().set_content_type("application/proto");
brpc::policy::SerializeHttpRequest(&request_buf, &cntl, &req);
char settingsbuf[brpc::policy::FRAME_HEAD_SIZE + 36];
brpc::H2Settings h2_settings;
const size_t nb = brpc::policy::SerializeH2Settings(h2_settings, settingsbuf + brpc::policy::FRAME_HEAD_SIZE);
brpc::policy::SerializeFrameHead(settingsbuf, nb, brpc::policy::H2_FRAME_SETTINGS, 0, 0);
butil::IOBuf buf;
buf.append(settingsbuf, brpc::policy::FRAME_HEAD_SIZE + nb);
brpc::policy::ParseH2Message(&buf, _h2_client_sock.get(), false, NULL);
int nsuc = brpc::H2Settings::DEFAULT_INITIAL_WINDOW_SIZE / cntl.request_attachment().size();
for (int i = 0; i <= nsuc; i++) {
brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(&cntl);
cntl._current_call.stream_user_data = h2_req;
brpc::SocketMessage* socket_message = NULL;
brpc::policy::PackH2Request(NULL, &socket_message, cntl.call_id().value,
NULL, &cntl, request_buf, NULL);
butil::IOBuf dummy;
butil::Status st = socket_message->AppendAndDestroySelf(&dummy, _h2_client_sock.get());
if (i == nsuc) {
// the last message should fail according to flow control policy.
ASSERT_FALSE(st.ok());
ASSERT_TRUE(st.error_code() == brpc::ELIMIT);
ASSERT_TRUE(butil::StringPiece(st.error_str()).starts_with("remote_window_left is not enough"));
} else {
ASSERT_TRUE(st.ok());
}
h2_req->DestroyStreamUserData(_h2_client_sock, &cntl, 0, false);
}
}
TEST_F(HttpTest, http2_settings) {
char settingsbuf[brpc::policy::FRAME_HEAD_SIZE + 36];
brpc::H2Settings h2_settings;
h2_settings.header_table_size = 8192;
h2_settings.max_concurrent_streams = 1024;
h2_settings.stream_window_size= (1u << 29) - 1;
const size_t nb = brpc::policy::SerializeH2Settings(h2_settings, settingsbuf + brpc::policy::FRAME_HEAD_SIZE);
brpc::policy::SerializeFrameHead(settingsbuf, nb, brpc::policy::H2_FRAME_SETTINGS, 0, 0);
butil::IOBuf buf;
buf.append(settingsbuf, brpc::policy::FRAME_HEAD_SIZE + nb);
brpc::policy::H2Context* ctx = new brpc::policy::H2Context(_socket.get(), NULL);
CHECK_EQ(ctx->Init(), 0);
_socket->initialize_parsing_context(&ctx);
ctx->_conn_state = brpc::policy::H2_CONNECTION_READY;
// parse settings
brpc::policy::ParseH2Message(&buf, _socket.get(), false, NULL);
butil::IOPortal response_buf;
CHECK_EQ(response_buf.append_from_file_descriptor(_pipe_fds[0], 1024),
(ssize_t)brpc::policy::FRAME_HEAD_SIZE);
brpc::policy::H2FrameHead frame_head;
butil::IOBufBytesIterator it(response_buf);
ctx->ConsumeFrameHead(it, &frame_head);
CHECK_EQ(frame_head.type, brpc::policy::H2_FRAME_SETTINGS);
CHECK_EQ(frame_head.flags, 0x01 /* H2_FLAGS_ACK */);
CHECK_EQ(frame_head.stream_id, 0);
ASSERT_TRUE(ctx->_remote_settings.header_table_size == 8192);
ASSERT_TRUE(ctx->_remote_settings.max_concurrent_streams == 1024);
ASSERT_TRUE(ctx->_remote_settings.stream_window_size == (1u << 29) - 1);
}
TEST_F(HttpTest, http2_invalid_settings) {
{
brpc::Server server;
brpc::ServerOptions options;
options.h2_settings.stream_window_size = brpc::H2Settings::MAX_WINDOW_SIZE + 1;
ASSERT_EQ(-1, server.Start("127.0.0.1:8924", &options));
}
{
brpc::Server server;
brpc::ServerOptions options;
options.h2_settings.max_frame_size =
brpc::H2Settings::DEFAULT_MAX_FRAME_SIZE - 1;
ASSERT_EQ(-1, server.Start("127.0.0.1:8924", &options));
}
{
brpc::Server server;
brpc::ServerOptions options;
options.h2_settings.max_frame_size =
brpc::H2Settings::MAX_OF_MAX_FRAME_SIZE + 1;
ASSERT_EQ(-1, server.Start("127.0.0.1:8924", &options));
}
}
TEST_F(HttpTest, http2_not_closing_socket_when_rpc_timeout) {
const int port = 8923;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "h2";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
test::EchoRequest req;
test::EchoResponse res;
req.set_message(EXP_REQUEST);
{
// make a successful call to create the connection first
brpc::Controller cntl;
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
}
brpc::SocketUniquePtr main_ptr;
EXPECT_EQ(brpc::Socket::Address(channel._server_id, &main_ptr), 0);
brpc::SocketId agent_id = main_ptr->_agent_socket_id.load(butil::memory_order_relaxed);
for (int i = 0; i < 4; i++) {
brpc::Controller cntl;
cntl.set_timeout_ms(50);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo?sleep_ms=300";
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
ASSERT_TRUE(cntl.Failed());
brpc::SocketUniquePtr ptr;
brpc::SocketId id = main_ptr->_agent_socket_id.load(butil::memory_order_relaxed);
EXPECT_EQ(id, agent_id);
}
{
// make a successful call again to make sure agent_socket not changing
brpc::Controller cntl;
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
channel.CallMethod(NULL, &cntl, &req, &res, NULL);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
brpc::SocketUniquePtr ptr;
brpc::SocketId id = main_ptr->_agent_socket_id.load(butil::memory_order_relaxed);
EXPECT_EQ(id, agent_id);
}
}
TEST_F(HttpTest, http2_header_after_data) {
brpc::Controller cntl;
// Prepare request
butil::IOBuf req_out;
int h2_stream_id = 0;
MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
// Prepare response to res_out
butil::IOBuf res_out;
{
butil::IOBuf data_buf;
test::EchoResponse res;
res.set_message(EXP_RESPONSE);
{
butil::IOBufAsZeroCopyOutputStream wrapper(&data_buf);
EXPECT_TRUE(res.SerializeToZeroCopyStream(&wrapper));
}
brpc::policy::H2Context* ctx =
static_cast<brpc::policy::H2Context*>(_h2_client_sock->parsing_context());
brpc::HPacker& hpacker = ctx->hpacker();
butil::IOBufAppender header1_appender;
brpc::HPackOptions options;
options.encode_name = false; /* disable huffman encoding */
options.encode_value = false;
{
brpc::HPacker::Header header(":status", "200");
hpacker.Encode(&header1_appender, header, options);
}
{
brpc::HPacker::Header header("content-length",
butil::string_printf("%llu", (unsigned long long)data_buf.size()));
hpacker.Encode(&header1_appender, header, options);
}
{
brpc::HPacker::Header header(":status", "200");
hpacker.Encode(&header1_appender, header, options);
}
{
brpc::HPacker::Header header("content-type", "application/proto");
hpacker.Encode(&header1_appender, header, options);
}
{
brpc::HPacker::Header header("user-defined1", "a");
hpacker.Encode(&header1_appender, header, options);
}
butil::IOBuf header1;
header1_appender.move_to(header1);
char headbuf[brpc::policy::FRAME_HEAD_SIZE];
brpc::policy::SerializeFrameHead(headbuf, header1.size(),
brpc::policy::H2_FRAME_HEADERS, 0, h2_stream_id);
// append header1
res_out.append(headbuf, sizeof(headbuf));
res_out.append(butil::IOBuf::Movable(header1));
brpc::policy::SerializeFrameHead(headbuf, data_buf.size(),
brpc::policy::H2_FRAME_DATA, 0, h2_stream_id);
// append data
res_out.append(headbuf, sizeof(headbuf));
res_out.append(butil::IOBuf::Movable(data_buf));
butil::IOBufAppender header2_appender;
{
brpc::HPacker::Header header("user-defined1", "overwrite-a");
hpacker.Encode(&header2_appender, header, options);
}
{
brpc::HPacker::Header header("user-defined2", "b");
hpacker.Encode(&header2_appender, header, options);
}
butil::IOBuf header2;
header2_appender.move_to(header2);
brpc::policy::SerializeFrameHead(headbuf, header2.size(),
brpc::policy::H2_FRAME_HEADERS, 0x05/* end header and stream */,
h2_stream_id);
// append header2
res_out.append(headbuf, sizeof(headbuf));
res_out.append(butil::IOBuf::Movable(header2));
}
// parse response
brpc::ParseResult res_pr =
brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_TRUE(res_pr.is_ok());
// process response
ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
ASSERT_FALSE(cntl.Failed());
brpc::HttpHeader& res_header = cntl.http_response();
ASSERT_EQ(res_header.content_type(), "application/proto");
// Check overlapped header is overwritten by the latter.
const std::string* user_defined1 = res_header.GetHeader("user-defined1");
ASSERT_EQ(*user_defined1, "a,overwrite-a");
const std::string* user_defined2 = res_header.GetHeader("user-defined2");
ASSERT_EQ(*user_defined2, "b");
}
TEST_F(HttpTest, http2_goaway_sanity) {
brpc::Controller cntl;
// Prepare request
butil::IOBuf req_out;
int h2_stream_id = 0;
MakeH2EchoRequestBuf(&req_out, &cntl, &h2_stream_id);
// Prepare response
butil::IOBuf res_out;
MakeH2EchoResponseBuf(&res_out, h2_stream_id);
// append goaway
char goawaybuf[9 /*FRAME_HEAD_SIZE*/ + 8];
brpc::policy::SerializeFrameHead(goawaybuf, 8, brpc::policy::H2_FRAME_GOAWAY, 0, 0);
SaveUint32(goawaybuf + 9, 0x7fffd8ef /*last stream id*/);
SaveUint32(goawaybuf + 13, brpc::H2_NO_ERROR);
res_out.append(goawaybuf, sizeof(goawaybuf));
// parse response
brpc::ParseResult res_pr =
brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_TRUE(res_pr.is_ok());
// process response
ProcessMessage(brpc::policy::ProcessHttpResponse, res_pr.message(), false);
ASSERT_TRUE(!cntl.Failed());
// parse GOAWAY
res_pr = brpc::policy::ParseH2Message(&res_out, _h2_client_sock.get(), false, NULL);
ASSERT_EQ(res_pr.error(), brpc::PARSE_ERROR_NOT_ENOUGH_DATA);
// Since GOAWAY has been received, the next request should fail
brpc::policy::H2UnsentRequest* h2_req = brpc::policy::H2UnsentRequest::New(&cntl);
cntl._current_call.stream_user_data = h2_req;
brpc::SocketMessage* socket_message = NULL;
brpc::policy::PackH2Request(NULL, &socket_message, cntl.call_id().value,
NULL, &cntl, butil::IOBuf(), NULL);
butil::IOBuf dummy;
butil::Status st = socket_message->AppendAndDestroySelf(&dummy, _h2_client_sock.get());
ASSERT_EQ(st.error_code(), brpc::ELOGOFF);
ASSERT_TRUE(st.error_data().ends_with("the connection just issued GOAWAY"));
}
class AfterRecevingGoAway : public ::google::protobuf::Closure {
public:
void Run() {
ASSERT_EQ(brpc::EHTTP, cntl.ErrorCode());
delete this;
}
brpc::Controller cntl;
};
TEST_F(HttpTest, http2_handle_goaway_streams) {
const butil::EndPoint ep(butil::IP_ANY, 5961);
butil::fd_guard listenfd(butil::tcp_listen(ep));
ASSERT_GT(listenfd, 0);
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_H2;
ASSERT_EQ(0, channel.Init(ep, &options));
int req_size = 10;
std::vector<brpc::CallId> ids(req_size);
for (int i = 0; i < req_size; i++) {
AfterRecevingGoAway* done = new AfterRecevingGoAway;
brpc::Controller& cntl = done->cntl;
ids.push_back(cntl.call_id());
cntl.set_timeout_ms(-1);
cntl.http_request().uri() = "/it-doesnt-matter";
channel.CallMethod(NULL, &cntl, NULL, NULL, done);
}
int servfd = accept(listenfd, NULL, NULL);
ASSERT_GT(servfd, 0);
// Sleep for a while to make sure that server has received all data.
bthread_usleep(2000);
char goawaybuf[brpc::policy::FRAME_HEAD_SIZE + 8];
SerializeFrameHead(goawaybuf, 8, brpc::policy::H2_FRAME_GOAWAY, 0, 0);
SaveUint32(goawaybuf + brpc::policy::FRAME_HEAD_SIZE, 0);
SaveUint32(goawaybuf + brpc::policy::FRAME_HEAD_SIZE + 4, 0);
ASSERT_EQ((ssize_t)brpc::policy::FRAME_HEAD_SIZE + 8, ::write(servfd, goawaybuf, brpc::policy::FRAME_HEAD_SIZE + 8));
// After receving GOAWAY, the callbacks in client should be run correctly.
for (int i = 0; i < req_size; i++) {
brpc::Join(ids[i]);
}
}
TEST_F(HttpTest, spring_protobuf_content_type) {
const int port = 8923;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, nullptr));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "http";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(EXP_REQUEST);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
cntl.http_request().set_content_type("application/x-protobuf");
cntl.request_attachment().append(req.SerializeAsString());
channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type());
ASSERT_TRUE(res.ParseFromString(cntl.response_attachment().to_string()));
ASSERT_EQ(EXP_RESPONSE, res.message());
brpc::Controller cntl2;
test::EchoService_Stub stub(&channel);
req.set_message(EXP_REQUEST);
res.Clear();
cntl2.http_request().set_content_type("application/x-protobuf");
stub.Echo(&cntl2, &req, &res, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ(EXP_RESPONSE, res.message());
ASSERT_EQ("application/x-protobuf", cntl.http_response().content_type());
}
TEST_F(HttpTest, dump_http_request) {
// save origin value of gflag
auto rpc_dump_dir = brpc::FLAGS_rpc_dump_dir;
auto rpc_dump_max_requests_in_one_file = brpc::FLAGS_rpc_dump_max_requests_in_one_file;
// set gflag and global variable in order to be sure to dump request
brpc::FLAGS_rpc_dump = true;
brpc::FLAGS_rpc_dump_dir = "dump_http_request";
brpc::FLAGS_rpc_dump_max_requests_in_one_file = 1;
brpc::g_rpc_dump_sl.ever_grabbed = true;
brpc::g_rpc_dump_sl.sampling_range = bvar::COLLECTOR_SAMPLING_BASE;
// init channel
const int port = 8923;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, nullptr));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "http";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
// send request and dump it to file
{
test::EchoRequest req;
req.set_message(EXP_REQUEST);
std::string req_json;
ASSERT_TRUE(json2pb::ProtoMessageToJson(req, &req_json));
brpc::Controller cntl;
cntl.http_request().uri() = "/EchoService/Echo";
cntl.http_request().set_content_type("application/json");
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.request_attachment() = req_json;
channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
ASSERT_FALSE(cntl.Failed());
// sleep 1s, because rpc_dump doesn't run immediately
sleep(1);
}
// replay request from dump file
{
brpc::SampleIterator it(brpc::FLAGS_rpc_dump_dir);
brpc::SampledRequest* sample = it.Next();
ASSERT_NE(nullptr, sample);
std::unique_ptr<brpc::SampledRequest> sample_guard(sample);
// the logic of next code is same as that in rpc_replay.cpp
ASSERT_EQ(sample->meta.protocol_type(), brpc::PROTOCOL_HTTP);
brpc::Controller cntl;
cntl.reset_sampled_request(sample_guard.release());
brpc::HttpMessage http_message;
http_message.ParseFromIOBuf(sample->request);
cntl.http_request().Swap(http_message.header());
// clear origin Host in header
cntl.http_request().RemoveHeader("Host");
cntl.http_request().uri().set_host("");
cntl.request_attachment() = http_message.body().movable();
channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ("application/json", cntl.http_response().content_type());
std::string res_json = cntl.response_attachment().to_string();
test::EchoResponse res;
json2pb::Json2PbOptions options;
ASSERT_TRUE(json2pb::JsonToProtoMessage(res_json, &res, options));
ASSERT_EQ(EXP_RESPONSE, res.message());
}
// delete dump directory
butil::DeleteFile(butil::FilePath(brpc::FLAGS_rpc_dump_dir), true);
// restore gflag and global variable
brpc::FLAGS_rpc_dump = false;
brpc::FLAGS_rpc_dump_dir = rpc_dump_dir;
brpc::FLAGS_rpc_dump_max_requests_in_one_file = rpc_dump_max_requests_in_one_file;
brpc::g_rpc_dump_sl.ever_grabbed = false;
brpc::g_rpc_dump_sl.sampling_range = 0;
}
TEST_F(HttpTest, spring_protobuf_text_content_type) {
const int port = 8923;
brpc::Server server;
EXPECT_EQ(0, server.AddService(&_svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, nullptr));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = "http";
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
brpc::Controller cntl;
test::EchoRequest req;
test::EchoResponse res;
req.set_message(EXP_REQUEST);
cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
cntl.http_request().uri() = "/EchoService/Echo";
cntl.http_request().set_content_type("application/proto-text");
cntl.request_attachment().append(req.Utf8DebugString());
channel.CallMethod(nullptr, &cntl, nullptr, nullptr, nullptr);
ASSERT_FALSE(cntl.Failed());
ASSERT_EQ("application/proto-text", cntl.http_response().content_type());
ASSERT_TRUE(google::protobuf::TextFormat::ParseFromString(
cntl.response_attachment().to_string(), &res));
ASSERT_EQ(EXP_RESPONSE, res.message());
}
class HttpServiceImpl : public ::test::HttpService {
public:
void Head(::google::protobuf::RpcController* cntl_base,
const ::test::HttpRequest*,
::test::HttpResponse*,
::google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
ASSERT_EQ(cntl->http_request().method(), brpc::HTTP_METHOD_HEAD);
const std::string* index = cntl->http_request().GetHeader("x-db-index");
ASSERT_NE(nullptr, index);
int i;
ASSERT_TRUE(butil::StringToInt(*index, &i));
cntl->http_response().set_content_type("text/plain");
if (i % 2 == 0) {
cntl->http_response().SetHeader("Content-Length",
EXP_RESPONSE_CONTENT_LENGTH);
} else {
cntl->http_response().SetHeader("Transfer-Encoding",
EXP_RESPONSE_TRANSFER_ENCODING);
}
}
void Expect(::google::protobuf::RpcController* cntl_base,
const ::test::HttpRequest*,
::test::HttpResponse*,
::google::protobuf::Closure* done) override {
brpc::ClosureGuard done_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
const std::string* expect = cntl->http_request().GetHeader("Expect");
ASSERT_TRUE(expect != NULL);
ASSERT_EQ("100-continue", *expect);
ASSERT_EQ(cntl->http_request().method(), brpc::HTTP_METHOD_POST);
cntl->response_attachment().append("world");
}
};
TEST_F(HttpTest, http_head) {
const int port = 8923;
brpc::Server server;
HttpServiceImpl svc;
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
brpc::Channel channel;
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_HTTP;
ASSERT_EQ(0, channel.Init(butil::EndPoint(butil::my_ip(), port), &options));
for (int i = 0; i < 100; ++i) {
brpc::Controller cntl;
cntl.http_request().set_method(brpc::HTTP_METHOD_HEAD);
cntl.http_request().uri().set_path("/HttpService/Head");
cntl.http_request().SetHeader("x-db-index", butil::IntToString(i));
channel.CallMethod(NULL, &cntl, NULL, NULL, NULL);
ASSERT_FALSE(cntl.Failed()) << cntl.ErrorText();
if (i % 2 == 0) {
const std::string* content_length
= cntl.http_response().GetHeader("content-length");
ASSERT_NE(nullptr, content_length);
ASSERT_EQ(EXP_RESPONSE_CONTENT_LENGTH, *content_length);
} else {
const std::string* transfer_encoding
= cntl.http_response().GetHeader("Transfer-Encoding");
ASSERT_NE(nullptr, transfer_encoding);
ASSERT_EQ(EXP_RESPONSE_TRANSFER_ENCODING, *transfer_encoding);
}
}
}
#define BRPC_CRLF "\r\n"
void MakeHttpRequestHeaders(butil::IOBuf* out,
brpc::HttpHeader* h,
const butil::EndPoint& remote_side) {
butil::IOBufBuilder os;
os << HttpMethod2Str(h->method()) << ' ';
const brpc::URI& uri = h->uri();
uri.PrintWithoutHost(os); // host is sent by "Host" header.
os << " HTTP/" << h->major_version() << '.'
<< h->minor_version() << BRPC_CRLF;
//rfc 7230#section-5.4:
//A client MUST send a Host header field in all HTTP/1.1 request
//messages. If the authority component is missing or undefined for
//the target URI, then a client MUST send a Host header field with an
//empty field-value.
//rfc 7231#sec4.3:
//the request-target consists of only the host name and port number of
//the tunnel destination, seperated by a colon. For example,
//Host: server.example.com:80
if (h->GetHeader("host") == NULL) {
os << "Host: ";
if (!uri.host().empty()) {
os << uri.host();
if (uri.port() >= 0) {
os << ':' << uri.port();
}
} else if (remote_side.port != 0) {
os << remote_side;
}
os << BRPC_CRLF;
}
if (!h->content_type().empty()) {
os << "Content-Type: " << h->content_type()
<< BRPC_CRLF;
}
for (brpc::HttpHeader::HeaderIterator it = h->HeaderBegin();
it != h->HeaderEnd(); ++it) {
os << it->first << ": " << it->second << BRPC_CRLF;
}
if (h->GetHeader("Accept") == NULL) {
os << "Accept: */*" BRPC_CRLF;
}
// The fake "curl" user-agent may let servers return plain-text results.
if (h->GetHeader("User-Agent") == NULL) {
os << "User-Agent: brpc/1.0 curl/7.0" BRPC_CRLF;
}
const std::string& user_info = h->uri().user_info();
if (!user_info.empty() && h->GetHeader("Authorization") == NULL) {
// NOTE: just assume user_info is well formatted, namely
// "<user_name>:<password>". Users are very unlikely to add extra
// characters in this part and even if users did, most of them are
// invalid and rejected by http_parser_parse_url().
std::string encoded_user_info;
butil::Base64Encode(user_info, &encoded_user_info);
os << "Authorization: Basic " << encoded_user_info << BRPC_CRLF;
}
os << BRPC_CRLF; // CRLF before content
os.move_to(*out);
}
#undef BRPC_CRLF
void ReadOneResponse(brpc::SocketUniquePtr& sock,
brpc::DestroyingPtr<brpc::policy::HttpContext>& imsg_guard) {
#if defined(OS_LINUX)
ASSERT_EQ(0, bthread_fd_wait(sock->fd(), EPOLLIN));
#elif defined(OS_MACOSX)
ASSERT_EQ(0, bthread_fd_wait(sock->fd(), EVFILT_READ));
#endif
butil::IOPortal read_buf;
int64_t start_time = butil::gettimeofday_us();
while (true) {
const ssize_t nr = read_buf.append_from_file_descriptor(sock->fd(), 4096);
LOG(INFO) << "nr=" << nr;
LOG(INFO) << butil::ToPrintableString(read_buf);
ASSERT_TRUE(nr > 0 || (nr < 0 && errno == EAGAIN));
if (errno == EAGAIN) {
ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L) << "Too long!";
bthread_usleep(1000);
continue;
}
brpc::ParseResult pr = brpc::policy::ParseHttpMessage(&read_buf, sock.get(), false, NULL);
ASSERT_TRUE(pr.error() == brpc::PARSE_ERROR_NOT_ENOUGH_DATA || pr.is_ok());
if (pr.is_ok()) {
imsg_guard.reset(static_cast<brpc::policy::HttpContext*>(pr.message()));
break;
}
}
ASSERT_TRUE(read_buf.empty());
}
TEST_F(HttpTest, http_expect) {
const int port = 8923;
brpc::Server server;
HttpServiceImpl svc;
EXPECT_EQ(0, server.AddService(&svc, brpc::SERVER_DOESNT_OWN_SERVICE));
EXPECT_EQ(0, server.Start(port, NULL));
butil::EndPoint ep;
ASSERT_EQ(0, butil::str2endpoint("127.0.0.1:8923", &ep));
brpc::SocketOptions options;
options.remote_side = ep;
brpc::SocketId id;
ASSERT_EQ(0, brpc::Socket::Create(options, &id));
brpc::SocketUniquePtr sock;
ASSERT_EQ(0, brpc::Socket::Address(id, &sock));
butil::IOBuf content;
content.append("hello");
brpc::HttpHeader header;
header.set_method(brpc::HTTP_METHOD_POST);
header.uri().set_path("/HttpService/Expect");
header.SetHeader("Expect", "100-continue");
header.SetHeader("Content-Length", std::to_string(content.size()));
butil::IOBuf header_buf;
MakeHttpRequestHeaders(&header_buf, &header, ep);
LOG(INFO) << butil::ToPrintableString(header_buf);
butil::IOBuf request_buf(header_buf);
request_buf.append(content);
ASSERT_EQ(0, sock->Write(&header_buf));
int64_t start_time = butil::gettimeofday_us();
while (sock->fd() < 0) {
bthread_usleep(1000);
ASSERT_LT(butil::gettimeofday_us(), start_time + 1000000L) << "Too long!";
}
// 100 Continue
brpc::DestroyingPtr<brpc::policy::HttpContext> imsg_guard;
ReadOneResponse(sock, imsg_guard);
ASSERT_EQ(imsg_guard->header().status_code(), brpc::HTTP_STATUS_CONTINUE);
ASSERT_EQ(0, sock->Write(&content));
// 200 Ok
ReadOneResponse(sock, imsg_guard);
ASSERT_EQ(imsg_guard->header().status_code(), brpc::HTTP_STATUS_OK);
ASSERT_EQ(0, sock->Write(&request_buf));
// 200 Ok
ReadOneResponse(sock, imsg_guard);
ASSERT_EQ(imsg_guard->header().status_code(), brpc::HTTP_STATUS_OK);
}
} //namespace