blob: b4758ad8c87e0a31924a999b16591221984a70ef [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <iomanip>
#include <arpa/inet.h> // inet_aton
#include <fcntl.h> // O_CREAT
#include <sys/stat.h> // mkdir
#include <gflags/gflags.h>
#include <google/protobuf/descriptor.h> // ServiceDescriptor
#include "idl_options.pb.h" // option(idl_support)
#include "bthread/unstable.h" // bthread_keytable_pool_init
#include "butil/macros.h" // ARRAY_SIZE
#include "butil/fd_guard.h" // fd_guard
#include "butil/logging.h" // CHECK
#include "butil/time.h"
#include "butil/class_name.h"
#include "butil/string_printf.h"
#include "brpc/log.h"
#include "brpc/compress.h"
#include "brpc/policy/nova_pbrpc_protocol.h"
#include "brpc/global.h"
#include "brpc/socket_map.h" // SocketMapList
#include "brpc/acceptor.h" // Acceptor
#include "brpc/details/ssl_helper.h" // CreateServerSSLContext
#include "brpc/protocol.h" // ListProtocols
#include "brpc/nshead_service.h" // NsheadService
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
#include "brpc/thrift_service.h" // ThriftService
#endif
#include "brpc/builtin/bad_method_service.h" // BadMethodService
#include "brpc/builtin/get_favicon_service.h"
#include "brpc/builtin/get_js_service.h"
#include "brpc/builtin/version_service.h"
#include "brpc/builtin/health_service.h"
#include "brpc/builtin/list_service.h"
#include "brpc/builtin/status_service.h"
#include "brpc/builtin/protobufs_service.h"
#include "brpc/builtin/threads_service.h"
#include "brpc/builtin/vlog_service.h"
#include "brpc/builtin/index_service.h" // IndexService
#include "brpc/builtin/connections_service.h" // ConnectionsService
#include "brpc/builtin/flags_service.h" // FlagsService
#include "brpc/builtin/vars_service.h" // VarsService
#include "brpc/builtin/rpcz_service.h" // RpczService
#include "brpc/builtin/dir_service.h" // DirService
#include "brpc/builtin/pprof_service.h" // PProfService
#include "brpc/builtin/bthreads_service.h" // BthreadsService
#include "brpc/builtin/ids_service.h" // IdsService
#include "brpc/builtin/sockets_service.h" // SocketsService
#include "brpc/builtin/hotspots_service.h" // HotspotsService
#include "brpc/builtin/prometheus_metrics_service.h"
#include "brpc/details/method_status.h"
#include "brpc/load_balancer.h"
#include "brpc/naming_service.h"
#include "brpc/simple_data_pool.h"
#include "brpc/server.h"
#include "brpc/trackme.h"
#include "brpc/restful.h"
#include "brpc/rtmp.h"
#include "brpc/builtin/common.h" // GetProgramName
#include "brpc/details/tcmalloc_extension.h"
#include "brpc/rdma/rdma_helper.h"
inline std::ostream& operator<<(std::ostream& os, const timeval& tm) {
const char old_fill = os.fill();
os << tm.tv_sec << '.' << std::setw(6) << std::setfill('0') << tm.tv_usec;
os.fill(old_fill);
return os;
}
extern "C" {
void* bthread_get_assigned_data();
}
namespace brpc {
BAIDU_CASSERT(sizeof(int32_t) == sizeof(butil::subtle::Atomic32),
Atomic32_must_be_int32);
extern const char* const g_server_info_prefix = "rpc_server";
const char* status_str(Server::Status s) {
switch (s) {
case Server::UNINITIALIZED: return "UNINITIALIZED";
case Server::READY: return "READY";
case Server::RUNNING: return "RUNNING";
case Server::STOPPING: return "STOPPING";
}
return "UNKNOWN_STATUS";
}
butil::static_atomic<int> g_running_server_count = BUTIL_STATIC_ATOMIC_INIT(0);
// Following services may have security issues and are disabled by default.
DEFINE_bool(enable_dir_service, false, "Enable /dir");
DEFINE_bool(enable_threads_service, false, "Enable /threads");
DECLARE_int32(usercode_backup_threads);
DECLARE_bool(usercode_in_pthread);
const int INITIAL_SERVICE_CAP = 64;
const int INITIAL_CERT_MAP = 64;
// NOTE: never make s_ncore extern const whose ctor seq against other
// compilation units is undefined.
const int s_ncore = sysconf(_SC_NPROCESSORS_ONLN);
ServerOptions::ServerOptions()
: idle_timeout_sec(-1)
, nshead_service(NULL)
, thrift_service(NULL)
, mongo_service_adaptor(NULL)
, auth(NULL)
, server_owns_auth(false)
, num_threads(8)
, max_concurrency(0)
, session_local_data_factory(NULL)
, reserved_session_local_data(0)
, thread_local_data_factory(NULL)
, reserved_thread_local_data(0)
, bthread_init_fn(NULL)
, bthread_init_args(NULL)
, bthread_init_count(0)
, internal_port(-1)
, has_builtin_services(true)
, use_rdma(false)
, http_master_service(NULL)
, health_reporter(NULL)
, rtmp_service(NULL)
, redis_service(NULL) {
if (s_ncore > 0) {
num_threads = s_ncore + 1;
}
}
ServerSSLOptions* ServerOptions::mutable_ssl_options() {
if (!_ssl_options) {
_ssl_options.reset(new ServerSSLOptions);
}
return _ssl_options.get();
}
Server::MethodProperty::OpaqueParams::OpaqueParams()
: is_tabbed(false)
, allow_default_url(false)
, allow_http_body_to_pb(true)
, pb_bytes_to_base64(false)
, pb_single_repeated_to_array(false) {
}
Server::MethodProperty::MethodProperty()
: is_builtin_service(false)
, own_method_status(false)
, http_url(NULL)
, service(NULL)
, method(NULL)
, status(NULL) {
}
static timeval GetUptime(void* arg/*start_time*/) {
return butil::microseconds_to_timeval(butil::cpuwide_time_us() - (intptr_t)arg);
}
static void PrintStartTime(std::ostream& os, void* arg) {
// Print when this server was Server::Start()-ed.
time_t start_time = static_cast<Server*>(arg)->last_start_time();
struct tm timeinfo;
char buf[64];
strftime(buf, sizeof(buf), "%Y/%m/%d-%H:%M:%S",
localtime_r(&start_time, &timeinfo));
os << buf;
}
static void PrintSupportedLB(std::ostream& os, void*) {
LoadBalancerExtension()->List(os, ' ');
}
static void PrintSupportedNS(std::ostream& os, void*) {
NamingServiceExtension()->List(os, ' ');
}
static void PrintSupportedProtocols(std::ostream& os, void*) {
std::vector<Protocol> protocols;
ListProtocols(&protocols);
for (size_t i = 0; i < protocols.size(); ++i) {
if (i != 0) {
os << ' ';
}
os << (protocols[i].name ? protocols[i].name : "(null)");
}
}
static void PrintSupportedCompressions(std::ostream& os, void*) {
std::vector<CompressHandler> compressors;
ListCompressHandler(&compressors);
for (size_t i = 0; i < compressors.size(); ++i) {
if (i != 0) {
os << ' ';
}
os << (compressors[i].name ? compressors[i].name : "(null)");
}
}
static void PrintEnabledProfilers(std::ostream& os, void*) {
if (cpu_profiler_enabled) {
os << "cpu ";
}
if (IsHeapProfilerEnabled()) {
if (has_TCMALLOC_SAMPLE_PARAMETER()) {
os << "heap ";
} else {
os << "heap(no TCMALLOC_SAMPLE_PARAMETER in env) ";
}
}
os << "contention";
}
static bvar::PassiveStatus<std::string> s_lb_st(
"rpc_load_balancer", PrintSupportedLB, NULL);
static bvar::PassiveStatus<std::string> s_ns_st(
"rpc_naming_service", PrintSupportedNS, NULL);
static bvar::PassiveStatus<std::string> s_proto_st(
"rpc_protocols", PrintSupportedProtocols, NULL);
static bvar::PassiveStatus<std::string> s_comp_st(
"rpc_compressions", PrintSupportedCompressions, NULL);
static bvar::PassiveStatus<std::string> s_prof_st(
"rpc_profilers", PrintEnabledProfilers, NULL);
static int32_t GetConnectionCount(void* arg) {
ServerStatistics ss;
static_cast<Server*>(arg)->GetStat(&ss);
return ss.connection_count;
}
static int32_t GetServiceCount(void* arg) {
ServerStatistics ss;
static_cast<Server*>(arg)->GetStat(&ss);
return ss.user_service_count;
}
static int32_t GetBuiltinServiceCount(void* arg) {
ServerStatistics ss;
static_cast<Server*>(arg)->GetStat(&ss);
return ss.builtin_service_count;
}
static bvar::Vector<unsigned, 2> GetSessionLocalDataCount(void* arg) {
bvar::Vector<unsigned, 2> v;
SimpleDataPool::Stat s =
static_cast<Server*>(arg)->session_local_data_pool()->stat();
v[0] = s.ncreated - s.nfree;
v[1] = s.nfree;
return v;
}
static int cast_no_barrier_int(void* arg) {
return butil::subtle::NoBarrier_Load(static_cast<int*>(arg));
}
std::string Server::ServerPrefix() const {
if(_options.server_info_name.empty()) {
return butil::string_printf("%s_%d", g_server_info_prefix, listen_address().port);
} else {
return std::string(g_server_info_prefix) + "_" + _options.server_info_name;
}
}
void* Server::UpdateDerivedVars(void* arg) {
const int64_t start_us = butil::cpuwide_time_us();
Server* server = static_cast<Server*>(arg);
const std::string prefix = server->ServerPrefix();
std::vector<SocketId> conns;
std::vector<SocketId> internal_conns;
server->_nerror_bvar.expose_as(prefix, "error");
server->_eps_bvar.expose_as(prefix, "eps");
server->_concurrency_bvar.expose_as(prefix, "concurrency");
bvar::PassiveStatus<timeval> uptime_st(
prefix, "uptime", GetUptime, (void*)(intptr_t)start_us);
bvar::PassiveStatus<std::string> start_time_st(
prefix, "start_time", PrintStartTime, server);
bvar::PassiveStatus<int32_t> nconn_st(
prefix, "connection_count", GetConnectionCount, server);
bvar::PassiveStatus<int32_t> nservice_st(
prefix, "service_count", GetServiceCount, server);
bvar::PassiveStatus<int32_t> nbuiltinservice_st(
prefix, "builtin_service_count", GetBuiltinServiceCount, server);
bvar::PassiveStatus<bvar::Vector<unsigned, 2> > nsessiondata_st(
GetSessionLocalDataCount, server);
if (server->session_local_data_pool()) {
nsessiondata_st.expose_as(prefix, "session_local_data_count");
nsessiondata_st.set_vector_names("using,free");
}
std::string mprefix = prefix;
for (MethodMap::iterator it = server->_method_map.begin();
it != server->_method_map.end(); ++it) {
// Not expose counters on builtin services.
if (!it->second.is_builtin_service) {
mprefix.resize(prefix.size());
mprefix.push_back('_');
bvar::to_underscored_name(&mprefix, it->second.method->full_name());
it->second.status->Expose(mprefix);
}
}
if (server->options().nshead_service) {
server->options().nshead_service->Expose(prefix);
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (server->options().thrift_service) {
server->options().thrift_service->Expose(prefix);
}
#endif
int64_t last_time = butil::gettimeofday_us();
int consecutive_nosleep = 0;
while (1) {
const int64_t sleep_us = 1000000L + last_time - butil::gettimeofday_us();
if (sleep_us < 1000L) {
if (++consecutive_nosleep >= 2) {
consecutive_nosleep = 0;
LOG(WARNING) << __FUNCTION__ << " is too busy!";
}
} else {
consecutive_nosleep = 0;
if (bthread_usleep(sleep_us) < 0) {
PLOG_IF(ERROR, errno != ESTOP) << "Fail to sleep";
return NULL;
}
}
last_time = butil::gettimeofday_us();
// Update stats of accepted sockets.
if (server->_am) {
server->_am->ListConnections(&conns);
}
if (server->_internal_am) {
server->_internal_am->ListConnections(&internal_conns);
}
const int64_t now_ms = butil::cpuwide_time_ms();
for (size_t i = 0; i < conns.size(); ++i) {
SocketUniquePtr ptr;
if (Socket::Address(conns[i], &ptr) == 0) {
ptr->UpdateStatsEverySecond(now_ms);
}
}
for (size_t i = 0; i < internal_conns.size(); ++i) {
SocketUniquePtr ptr;
if (Socket::Address(internal_conns[i], &ptr) == 0) {
ptr->UpdateStatsEverySecond(now_ms);
}
}
}
}
const std::string& Server::ServiceProperty::service_name() const {
if (service) {
return service->GetDescriptor()->full_name();
} else if (restful_map) {
return restful_map->service_name();
}
const static std::string s_unknown_name = "";
return s_unknown_name;
}
Server::Server(ProfilerLinker)
: _session_local_data_pool(NULL)
, _status(UNINITIALIZED)
, _builtin_service_count(0)
, _virtual_service_count(0)
, _failed_to_set_max_concurrency_of_method(false)
, _am(NULL)
, _internal_am(NULL)
, _first_service(NULL)
, _tab_info_list(NULL)
, _global_restful_map(NULL)
, _last_start_time(0)
, _derivative_thread(INVALID_BTHREAD)
, _keytable_pool(NULL)
, _eps_bvar(&_nerror_bvar)
, _concurrency(0)
, _concurrency_bvar(cast_no_barrier_int, &_concurrency) {
BAIDU_CASSERT(offsetof(Server, _concurrency) % 64 == 0,
Server_concurrency_must_be_aligned_by_cacheline);
}
Server::~Server() {
Stop(0);
Join();
ClearServices();
FreeSSLContexts();
delete _session_local_data_pool;
_session_local_data_pool = NULL;
delete _options.nshead_service;
_options.nshead_service = NULL;
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
delete _options.thrift_service;
_options.thrift_service = NULL;
#endif
delete _options.http_master_service;
_options.http_master_service = NULL;
delete _am;
_am = NULL;
delete _internal_am;
_internal_am = NULL;
delete _tab_info_list;
_tab_info_list = NULL;
delete _global_restful_map;
_global_restful_map = NULL;
if (!_options.pid_file.empty()) {
unlink(_options.pid_file.c_str());
}
if (_options.server_owns_auth) {
delete _options.auth;
_options.auth = NULL;
}
delete _options.redis_service;
_options.redis_service = NULL;
}
int Server::AddBuiltinServices() {
// Firstly add services shown in tabs.
if (AddBuiltinService(new (std::nothrow) StatusService)) {
LOG(ERROR) << "Fail to add StatusService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) VarsService)) {
LOG(ERROR) << "Fail to add VarsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) ConnectionsService)) {
LOG(ERROR) << "Fail to add ConnectionsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) FlagsService)) {
LOG(ERROR) << "Fail to add FlagsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) RpczService)) {
LOG(ERROR) << "Fail to add RpczService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) HotspotsService)) {
LOG(ERROR) << "Fail to add HotspotsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) IndexService)) {
LOG(ERROR) << "Fail to add IndexService";
return -1;
}
// Add other services.
if (AddBuiltinService(new (std::nothrow) VersionService(this))) {
LOG(ERROR) << "Fail to add VersionService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) HealthService)) {
LOG(ERROR) << "Fail to add HealthService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) ProtobufsService(this))) {
LOG(ERROR) << "Fail to add ProtobufsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) BadMethodService)) {
LOG(ERROR) << "Fail to add BadMethodService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) ListService(this))) {
LOG(ERROR) << "Fail to add ListService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) PrometheusMetricsService)) {
LOG(ERROR) << "Fail to add MetricsService";
return -1;
}
if (FLAGS_enable_threads_service &&
AddBuiltinService(new (std::nothrow) ThreadsService)) {
LOG(ERROR) << "Fail to add ThreadsService";
return -1;
}
#if !BRPC_WITH_GLOG
if (AddBuiltinService(new (std::nothrow) VLogService)) {
LOG(ERROR) << "Fail to add VLogService";
return -1;
}
#endif
if (AddBuiltinService(new (std::nothrow) PProfService)) {
LOG(ERROR) << "Fail to add PProfService";
return -1;
}
if (FLAGS_enable_dir_service &&
AddBuiltinService(new (std::nothrow) DirService)) {
LOG(ERROR) << "Fail to add DirService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) BthreadsService)) {
LOG(ERROR) << "Fail to add BthreadsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) IdsService)) {
LOG(ERROR) << "Fail to add IdsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) SocketsService)) {
LOG(ERROR) << "Fail to add SocketsService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) GetFaviconService)) {
LOG(ERROR) << "Fail to add GetFaviconService";
return -1;
}
if (AddBuiltinService(new (std::nothrow) GetJsService)) {
LOG(ERROR) << "Fail to add GetJsService";
return -1;
}
return 0;
}
bool is_http_protocol(const char* name) {
if (name[0] != 'h') {
return false;
}
return strcmp(name, "http") == 0 || strcmp(name, "h2") == 0;
}
Acceptor* Server::BuildAcceptor() {
std::set<std::string> whitelist;
for (butil::StringSplitter sp(_options.enabled_protocols.c_str(), ' ');
sp; ++sp) {
std::string protocol(sp.field(), sp.length());
whitelist.insert(protocol);
}
const bool has_whitelist = !whitelist.empty();
Acceptor* acceptor = new (std::nothrow) Acceptor(_keytable_pool);
if (NULL == acceptor) {
LOG(ERROR) << "Fail to new Acceptor";
return NULL;
}
InputMessageHandler handler;
std::vector<Protocol> protocols;
ListProtocols(&protocols);
for (size_t i = 0; i < protocols.size(); ++i) {
if (protocols[i].process_request == NULL) {
// The protocol does not support server-side.
continue;
}
if (has_whitelist &&
!is_http_protocol(protocols[i].name) &&
!whitelist.erase(protocols[i].name)) {
// the protocol is not allowed to serve.
RPC_VLOG << "Skip protocol=" << protocols[i].name;
continue;
}
// `process_request' is required at server side
handler.parse = protocols[i].parse;
handler.process = protocols[i].process_request;
handler.verify = protocols[i].verify;
handler.arg = this;
handler.name = protocols[i].name;
if (acceptor->AddHandler(handler) != 0) {
LOG(ERROR) << "Fail to add handler into Acceptor("
<< acceptor << ')';
delete acceptor;
return NULL;
}
}
if (!whitelist.empty()) {
std::ostringstream err;
err << "ServerOptions.enabled_protocols has unknown protocols=`";
for (std::set<std::string>::const_iterator it = whitelist.begin();
it != whitelist.end(); ++it) {
err << *it << ' ';
}
err << '\'';
delete acceptor;
LOG(ERROR) << err.str();
return NULL;
}
return acceptor;
}
int Server::InitializeOnce() {
if (_status != UNINITIALIZED) {
return 0;
}
GlobalInitializeOrDie();
if (_status != UNINITIALIZED) {
return 0;
}
if (_fullname_service_map.init(INITIAL_SERVICE_CAP) != 0) {
LOG(ERROR) << "Fail to init _fullname_service_map";
return -1;
}
if (_service_map.init(INITIAL_SERVICE_CAP) != 0) {
LOG(ERROR) << "Fail to init _service_map";
return -1;
}
if (_method_map.init(INITIAL_SERVICE_CAP * 2) != 0) {
LOG(ERROR) << "Fail to init _method_map";
return -1;
}
if (_ssl_ctx_map.init(INITIAL_CERT_MAP) != 0) {
LOG(ERROR) << "Fail to init _ssl_ctx_map";
return -1;
}
_status = READY;
return 0;
}
static void* CreateServerTLS(const void* args) {
return static_cast<const DataFactory*>(args)->CreateData();
}
static void DestroyServerTLS(void* data, const void* void_factory) {
static_cast<const DataFactory*>(void_factory)->DestroyData(data);
}
struct BthreadInitArgs {
bool (*bthread_init_fn)(void* args); // default: NULL (do nothing)
void* bthread_init_args; // default: NULL
bool result;
bool done;
bool stop;
bthread_t th;
};
static void* BthreadInitEntry(void* void_args) {
BthreadInitArgs* args = (BthreadInitArgs*)void_args;
args->result = args->bthread_init_fn(args->bthread_init_args);
args->done = true;
while (!args->stop) {
bthread_usleep(1000);
}
return NULL;
}
struct RevertServerStatus {
inline void operator()(Server* s) const {
if (s != NULL) {
s->Stop(0);
s->Join();
}
}
};
static int get_port_from_fd(int fd) {
struct sockaddr_in addr;
socklen_t size = sizeof(addr);
if (getsockname(fd, (struct sockaddr*)&addr, &size) < 0) {
return -1;
}
return ntohs(addr.sin_port);
}
static bool CreateConcurrencyLimiter(const AdaptiveMaxConcurrency& amc,
ConcurrencyLimiter** out) {
if (amc.type() == AdaptiveMaxConcurrency::UNLIMITED()) {
*out = NULL;
return true;
}
const ConcurrencyLimiter* cl =
ConcurrencyLimiterExtension()->Find(amc.type().c_str());
if (cl == NULL) {
LOG(ERROR) << "Fail to find ConcurrencyLimiter by `" << amc.value() << "'";
return false;
}
ConcurrencyLimiter* cl_copy = cl->New(amc);
if (cl_copy == NULL) {
LOG(ERROR) << "Fail to new ConcurrencyLimiter";
return false;
}
*out = cl_copy;
return true;
}
#if BRPC_WITH_RDMA
static bool OptionsAvailableOverRdma(const ServerOptions* opt) {
if (opt->rtmp_service) {
LOG(WARNING) << "RTMP is not supported by RDMA";
return false;
}
if (opt->has_ssl_options()) {
LOG(WARNING) << "SSL is not supported by RDMA";
return false;
}
if (opt->nshead_service) {
LOG(WARNING) << "NSHEAD is not supported by RDMA";
return false;
}
if (opt->mongo_service_adaptor) {
LOG(WARNING) << "MONGO is not supported by RDMA";
return false;
}
return true;
}
#endif
static AdaptiveMaxConcurrency g_default_max_concurrency_of_method(0);
int Server::StartInternal(const butil::EndPoint& endpoint,
const PortRange& port_range,
const ServerOptions *opt) {
std::unique_ptr<Server, RevertServerStatus> revert_server(this);
if (_failed_to_set_max_concurrency_of_method) {
_failed_to_set_max_concurrency_of_method = false;
LOG(ERROR) << "previous call to MaxConcurrencyOf() was failed, "
"fix it before starting server";
return -1;
}
if (InitializeOnce() != 0) {
LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
return -1;
}
const Status st = status();
if (st != READY) {
if (st == RUNNING) {
LOG(ERROR) << "Server[" << version() << "] is already running on "
<< _listen_addr;
} else {
LOG(ERROR) << "Can't start Server[" << version()
<< "] which is " << status_str(status());
}
return -1;
}
if (opt) {
_options = *opt;
} else {
// Always reset to default options explicitly since `_options'
// may be the options for the last run or even bad options
_options = ServerOptions();
}
if (!_options.h2_settings.IsValid(true/*log_error*/)) {
LOG(ERROR) << "Invalid h2_settings";
return -1;
}
if (_options.use_rdma) {
#if BRPC_WITH_RDMA
if (!OptionsAvailableOverRdma(&_options)) {
return -1;
}
rdma::GlobalRdmaInitializeOrDie();
#else
LOG(WARNING) << "Cannot use rdma since brpc does not compile with rdma";
return -1;
#endif
}
if (_options.http_master_service) {
// Check requirements for http_master_service:
// has "default_method" & request/response have no fields
const google::protobuf::ServiceDescriptor* sd =
_options.http_master_service->GetDescriptor();
const google::protobuf::MethodDescriptor* md =
sd->FindMethodByName("default_method");
if (md == NULL) {
LOG(ERROR) << "http_master_service must have a method named `default_method'";
return -1;
}
if (md->input_type()->field_count() != 0) {
LOG(ERROR) << "The request type of http_master_service must have "
"no fields, actually " << md->input_type()->field_count();
return -1;
}
if (md->output_type()->field_count() != 0) {
LOG(ERROR) << "The response type of http_master_service must have "
"no fields, actually " << md->output_type()->field_count();
return -1;
}
}
// CAUTION:
// Following code may run multiple times if this server is started and
// stopped more than once. Reuse or delete previous resources!
if (_options.session_local_data_factory) {
if (_session_local_data_pool == NULL) {
_session_local_data_pool =
new (std::nothrow) SimpleDataPool(_options.session_local_data_factory);
if (NULL == _session_local_data_pool) {
LOG(ERROR) << "Fail to new SimpleDataPool";
return -1;
}
} else {
_session_local_data_pool->Reset(_options.session_local_data_factory);
}
_session_local_data_pool->Reserve(_options.reserved_session_local_data);
}
// Init _keytable_pool always. If the server was stopped before, the pool
// should be destroyed in Join().
_keytable_pool = new bthread_keytable_pool_t;
if (bthread_keytable_pool_init(_keytable_pool) != 0) {
LOG(ERROR) << "Fail to init _keytable_pool";
delete _keytable_pool;
_keytable_pool = NULL;
return -1;
}
if (_options.thread_local_data_factory) {
_tl_options.thread_local_data_factory = _options.thread_local_data_factory;
if (bthread_key_create2(&_tl_options.tls_key, DestroyServerTLS,
_options.thread_local_data_factory) != 0) {
LOG(ERROR) << "Fail to create thread-local key";
return -1;
}
if (_options.reserved_thread_local_data) {
bthread_keytable_pool_reserve(_keytable_pool,
_options.reserved_thread_local_data,
_tl_options.tls_key,
CreateServerTLS,
_options.thread_local_data_factory);
}
} else {
_tl_options = ThreadLocalOptions();
}
if (_options.bthread_init_count != 0 &&
_options.bthread_init_fn != NULL) {
// Create some special bthreads to call the init functions. The
// bthreads will not quit until all bthreads finish the init function.
BthreadInitArgs* init_args
= new BthreadInitArgs[_options.bthread_init_count];
size_t ncreated = 0;
for (size_t i = 0; i < _options.bthread_init_count; ++i, ++ncreated) {
init_args[i].bthread_init_fn = _options.bthread_init_fn;
init_args[i].bthread_init_args = _options.bthread_init_args;
init_args[i].result = false;
init_args[i].done = false;
init_args[i].stop = false;
bthread_attr_t tmp = BTHREAD_ATTR_NORMAL;
tmp.keytable_pool = _keytable_pool;
if (bthread_start_background(
&init_args[i].th, &tmp, BthreadInitEntry, &init_args[i]) != 0) {
break;
}
}
// Wait until all created bthreads finish the init function.
for (size_t i = 0; i < ncreated; ++i) {
while (!init_args[i].done) {
bthread_usleep(1000);
}
}
// Stop and join created bthreads.
for (size_t i = 0; i < ncreated; ++i) {
init_args[i].stop = true;
}
for (size_t i = 0; i < ncreated; ++i) {
bthread_join(init_args[i].th, NULL);
}
size_t num_failed_result = 0;
for (size_t i = 0; i < ncreated; ++i) {
if (!init_args[i].result) {
++num_failed_result;
}
}
delete [] init_args;
if (ncreated != _options.bthread_init_count) {
LOG(ERROR) << "Fail to create "
<< _options.bthread_init_count - ncreated << " bthreads";
return -1;
}
if (num_failed_result != 0) {
LOG(ERROR) << num_failed_result << " bthread_init_fn failed";
return -1;
}
}
// Free last SSL contexts
FreeSSLContexts();
if (_options.has_ssl_options()) {
CertInfo& default_cert = _options.mutable_ssl_options()->default_cert;
if (default_cert.certificate.empty()) {
LOG(ERROR) << "default_cert is empty";
return -1;
}
if (AddCertificate(default_cert) != 0) {
return -1;
}
_default_ssl_ctx = _ssl_ctx_map.begin()->second.ctx;
const std::vector<CertInfo>& certs = _options.mutable_ssl_options()->certs;
for (size_t i = 0; i < certs.size(); ++i) {
if (AddCertificate(certs[i]) != 0) {
return -1;
}
}
}
_concurrency = 0;
if (_options.has_builtin_services &&
_builtin_service_count <= 0 &&
AddBuiltinServices() != 0) {
LOG(ERROR) << "Fail to add builtin services";
return -1;
}
// If a server is started/stopped for mutiple times and one of the options
// sets has_builtin_service to true, builtin services will be enabled for
// any later re-start. Check this case and report to user.
if (!_options.has_builtin_services && _builtin_service_count > 0) {
LOG(ERROR) << "A server started/stopped for multiple times must be "
"consistent on ServerOptions.has_builtin_services";
return -1;
}
// Prepare all restful maps
for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) {
if (it->second.restful_map) {
it->second.restful_map->PrepareForFinding();
}
}
if (_global_restful_map) {
_global_restful_map->PrepareForFinding();
}
if (_options.num_threads > 0) {
if (FLAGS_usercode_in_pthread) {
_options.num_threads += FLAGS_usercode_backup_threads;
}
if (_options.num_threads < BTHREAD_MIN_CONCURRENCY) {
_options.num_threads = BTHREAD_MIN_CONCURRENCY;
}
bthread_setconcurrency(_options.num_threads);
}
for (MethodMap::iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.is_builtin_service) {
it->second.status->SetConcurrencyLimiter(NULL);
} else {
const AdaptiveMaxConcurrency* amc = &it->second.max_concurrency;
if (amc->type() == AdaptiveMaxConcurrency::UNLIMITED()) {
amc = &_options.method_max_concurrency;
}
ConcurrencyLimiter* cl = NULL;
if (!CreateConcurrencyLimiter(*amc, &cl)) {
LOG(ERROR) << "Fail to create ConcurrencyLimiter for method";
return -1;
}
it->second.status->SetConcurrencyLimiter(cl);
}
}
// Create listening ports
if (port_range.min_port > port_range.max_port) {
LOG(ERROR) << "Invalid port_range=[" << port_range.min_port << '-'
<< port_range.max_port << ']';
return -1;
}
if (butil::is_endpoint_extended(endpoint) &&
(port_range.min_port != endpoint.port || port_range.max_port != endpoint.port)) {
LOG(ERROR) << "Only IPv4 address supports port range feature";
return -1;
}
_listen_addr = endpoint;
for (int port = port_range.min_port; port <= port_range.max_port; ++port) {
_listen_addr.port = port;
butil::fd_guard sockfd(tcp_listen(_listen_addr));
if (sockfd < 0) {
if (port != port_range.max_port) { // not the last port, try next
continue;
}
if (port_range.min_port != port_range.max_port) {
LOG(ERROR) << "Fail to listen " << _listen_addr.ip
<< ":[" << port_range.min_port << '-'
<< port_range.max_port << ']';
} else {
LOG(ERROR) << "Fail to listen " << _listen_addr;
}
return -1;
}
if (_listen_addr.port == 0) {
// port=0 makes kernel dynamically select a port from
// https://en.wikipedia.org/wiki/Ephemeral_port
_listen_addr.port = get_port_from_fd(sockfd);
if (_listen_addr.port <= 0) {
LOG(ERROR) << "Fail to get port from fd=" << sockfd;
return -1;
}
}
if (_am == NULL) {
_am = BuildAcceptor();
if (NULL == _am) {
LOG(ERROR) << "Fail to build acceptor";
return -1;
}
_am->_use_rdma = _options.use_rdma;
}
// Set `_status' to RUNNING before accepting connections
// to prevent requests being rejected as ELOGOFF
_status = RUNNING;
time(&_last_start_time);
GenerateVersionIfNeeded();
g_running_server_count.fetch_add(1, butil::memory_order_relaxed);
// Pass ownership of `sockfd' to `_am'
if (_am->StartAccept(sockfd, _options.idle_timeout_sec,
_default_ssl_ctx) != 0) {
LOG(ERROR) << "Fail to start acceptor";
return -1;
}
sockfd.release();
break; // stop trying
}
if (_options.internal_port >= 0 && _options.has_builtin_services) {
if (_options.internal_port == _listen_addr.port) {
LOG(ERROR) << "ServerOptions.internal_port=" << _options.internal_port
<< " is same with port=" << _listen_addr.port << " to Start()";
return -1;
}
if (_options.internal_port == 0) {
LOG(ERROR) << "ServerOptions.internal_port cannot be 0, which"
" allocates a dynamic and probabaly unfiltered port,"
" against the purpose of \"being internal\".";
return -1;
}
if (butil::is_endpoint_extended(endpoint)) {
LOG(ERROR) << "internal_port is available in IPv4 address only";
return -1;
}
butil::EndPoint internal_point = _listen_addr;
internal_point.port = _options.internal_port;
butil::fd_guard sockfd(tcp_listen(internal_point));
if (sockfd < 0) {
LOG(ERROR) << "Fail to listen " << internal_point << " (internal)";
return -1;
}
if (NULL == _internal_am) {
_internal_am = BuildAcceptor();
if (NULL == _internal_am) {
LOG(ERROR) << "Fail to build internal acceptor";
return -1;
}
}
// Pass ownership of `sockfd' to `_internal_am'
if (_internal_am->StartAccept(sockfd, _options.idle_timeout_sec,
_default_ssl_ctx) != 0) {
LOG(ERROR) << "Fail to start internal_acceptor";
return -1;
}
sockfd.release();
}
PutPidFileIfNeeded();
// Launch _derivative_thread.
CHECK_EQ(INVALID_BTHREAD, _derivative_thread);
if (bthread_start_background(&_derivative_thread, NULL,
UpdateDerivedVars, this) != 0) {
LOG(ERROR) << "Fail to create _derivative_thread";
return -1;
}
// Print tips to server launcher.
if (butil::is_endpoint_extended(_listen_addr)) {
const char* builtin_msg = _options.has_builtin_services ? " with builtin service" : "";
LOG(INFO) << "Server[" << version() << "] is serving on " << _listen_addr
<< builtin_msg << '.';
//TODO add TrackMe support
} else {
int http_port = _listen_addr.port;
std::ostringstream server_info;
server_info << "Server[" << version() << "] is serving on port="
<< _listen_addr.port;
if (_options.internal_port >= 0 && _options.has_builtin_services) {
http_port = _options.internal_port;
server_info << " and internal_port=" << _options.internal_port;
}
LOG(INFO) << server_info.str() << '.';
if (_options.has_builtin_services) {
LOG(INFO) << "Check out http://" << butil::my_hostname() << ':'
<< http_port << " in web browser.";
} else {
LOG(WARNING) << "Builtin services are disabled according to "
"ServerOptions.has_builtin_services";
}
// For trackme reporting
SetTrackMeAddress(butil::EndPoint(butil::my_ip(), http_port));
}
revert_server.release();
return 0;
}
int Server::Start(const butil::EndPoint& endpoint, const ServerOptions* opt) {
return StartInternal(
endpoint, PortRange(endpoint.port, endpoint.port), opt);
}
int Server::Start(const char* ip_port_str, const ServerOptions* opt) {
butil::EndPoint point;
if (str2endpoint(ip_port_str, &point) != 0 &&
hostname2endpoint(ip_port_str, &point) != 0) {
LOG(ERROR) << "Invalid address=`" << ip_port_str << '\'';
return -1;
}
return Start(point, opt);
}
int Server::Start(int port, const ServerOptions* opt) {
if (port < 0 || port > 65535) {
LOG(ERROR) << "Invalid port=" << port;
return -1;
}
return Start(butil::EndPoint(butil::IP_ANY, port), opt);
}
int Server::Start(const char* ip_str, PortRange port_range,
const ServerOptions *opt) {
butil::ip_t ip;
if (butil::str2ip(ip_str, &ip) != 0 &&
butil::hostname2ip(ip_str, &ip) != 0) {
LOG(ERROR) << "Invalid address=`" << ip_str << '\'';
return -1;
}
return StartInternal(butil::EndPoint(ip, 0), port_range, opt);
}
int Server::Start(PortRange port_range, const ServerOptions* opt) {
return StartInternal(butil::EndPoint(butil::IP_ANY, 0), port_range, opt);
}
int Server::Stop(int timeout_ms) {
if (_status != RUNNING) {
return -1;
}
_status = STOPPING;
LOG(INFO) << "Server[" << version() << "] is going to quit";
if (_am) {
_am->StopAccept(timeout_ms);
}
if (_internal_am) {
// TODO: calculate timeout?
_internal_am->StopAccept(timeout_ms);
}
return 0;
}
// NOTE: Join() can happen before Stop().
int Server::Join() {
if (_status != RUNNING && _status != STOPPING) {
return -1;
}
if (_am) {
_am->Join();
}
if (_internal_am) {
_internal_am->Join();
}
if (_session_local_data_pool) {
// We can't delete the pool right here because there's a bvar watching
// this pool in _derivative_thread which does not quit yet.
_session_local_data_pool->Reset(NULL);
}
if (_keytable_pool) {
// Destroy _keytable_pool to delete keytables inside. This has to be
// done here (before leaving Join) because it's legal for users to
// delete bthread keys after Join which makes related objects
// in KeyTables undeletable anymore and leaked.
CHECK_EQ(0, bthread_keytable_pool_destroy(_keytable_pool));
// TODO: Can't delete _keytable_pool which may be accessed by
// still-running bthreads (created by the server). The memory is
// leaked but servers are unlikely to be started/stopped frequently,
// the leak is acceptable in most scenarios.
_keytable_pool = NULL;
}
// Delete tls_key as well since we don't need it anymore.
if (_tl_options.tls_key != INVALID_BTHREAD_KEY) {
CHECK_EQ(0, bthread_key_delete(_tl_options.tls_key));
_tl_options.tls_key = INVALID_BTHREAD_KEY;
}
// Have to join _derivative_thread, which may assume that server is running
// and services in server are not mutated, otherwise data race happens
// between Add/RemoveService after Join() and the thread.
if (_derivative_thread != INVALID_BTHREAD) {
bthread_stop(_derivative_thread);
bthread_join(_derivative_thread, NULL);
_derivative_thread = INVALID_BTHREAD;
}
g_running_server_count.fetch_sub(1, butil::memory_order_relaxed);
_status = READY;
return 0;
}
int Server::AddServiceInternal(google::protobuf::Service* service,
bool is_builtin_service,
const ServiceOptions& svc_opt) {
if (NULL == service) {
LOG(ERROR) << "Parameter[service] is NULL!";
return -1;
}
const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();
if (sd->method_count() == 0) {
LOG(ERROR) << "service=" << sd->full_name()
<< " does not have any method.";
return -1;
}
if (InitializeOnce() != 0) {
LOG(ERROR) << "Fail to initialize Server[" << version() << ']';
return -1;
}
if (status() != READY) {
LOG(ERROR) << "Can't add service=" << sd->full_name() << " to Server["
<< version() << "] which is " << status_str(status());
return -1;
}
if (_fullname_service_map.seek(sd->full_name()) != NULL) {
LOG(ERROR) << "service=" << sd->full_name() << " already exists";
return -1;
}
ServiceProperty* old_ss = _service_map.seek(sd->name());
if (old_ss != NULL) {
// names conflict.
LOG(ERROR) << "Conflict service name between "
<< sd->full_name() << " and "
<< old_ss->service_name();
return -1;
}
// defined `option (idl_support) = true' or not.
const bool is_idl_support = sd->file()->options().GetExtension(idl_support);
Tabbed* tabbed = dynamic_cast<Tabbed*>(service);
for (int i = 0; i < sd->method_count(); ++i) {
const google::protobuf::MethodDescriptor* md = sd->method(i);
MethodProperty mp;
mp.is_builtin_service = is_builtin_service;
mp.own_method_status = true;
mp.params.is_tabbed = !!tabbed;
mp.params.allow_default_url = svc_opt.allow_default_url;
mp.params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
mp.params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
mp.params.pb_single_repeated_to_array = svc_opt.pb_single_repeated_to_array;
mp.service = service;
mp.method = md;
mp.status = new MethodStatus;
_method_map[md->full_name()] = mp;
if (is_idl_support && sd->name() != sd->full_name()/*has ns*/) {
MethodProperty mp2 = mp;
mp2.own_method_status = false;
// have to map service_name + method_name as well because ubrpc
// does not send the namespace before service_name.
std::string full_name_wo_ns;
full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size());
full_name_wo_ns.append(sd->name());
full_name_wo_ns.push_back('.');
full_name_wo_ns.append(md->name());
if (_method_map.seek(full_name_wo_ns) == NULL) {
_method_map[full_name_wo_ns] = mp2;
} else {
LOG(ERROR) << '`' << full_name_wo_ns << "' already exists";
RemoveMethodsOf(service);
return -1;
}
}
}
const ServiceProperty ss = {
is_builtin_service, svc_opt.ownership, service, NULL };
_fullname_service_map[sd->full_name()] = ss;
_service_map[sd->name()] = ss;
if (is_builtin_service) {
++_builtin_service_count;
} else {
if (_first_service == NULL) {
_first_service = service;
}
}
butil::StringPiece restful_mappings = svc_opt.restful_mappings;
restful_mappings.trim_spaces();
if (!restful_mappings.empty()) {
// Parse the mappings.
std::vector<RestfulMapping> mappings;
if (!ParseRestfulMappings(restful_mappings, &mappings)) {
LOG(ERROR) << "Fail to parse mappings `" << restful_mappings << '\'';
RemoveService(service);
return -1;
}
if (mappings.empty()) {
// we already trimmed at the beginning, this is impossible.
LOG(ERROR) << "Impossible: Nothing in restful_mappings";
RemoveService(service);
return -1;
}
// Due the flexibility of URL matching, it's almost impossible to
// dispatch all kinds of URL to different methods *efficiently* just
// inside the HTTP protocol impl. We would like to match most-
// frequently-used URLs(/Service/Method) fastly and match more complex
// URLs inside separate functions.
// The trick is adding some entries inside the service maps without
// real services, mapping from the first component in the URL to a
// RestfulMap which does the complex matchings. For example:
// "/v1/send => SendFn, /v1/recv => RecvFn, /v2/check => CheckFn"
// We'll create 2 entries in service maps (_fullname_service_map and
// _service_map) mapping from "v1" and "v2" to 2 different RestfulMap
// respectively. When the URL is accessed, we extract the first
// component, find the RestfulMap and do url matchings. Regular url
// handling is not affected.
for (size_t i = 0; i < mappings.size(); ++i) {
const std::string full_method_name =
sd->full_name() + "." + mappings[i].method_name;
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Unknown method=`" << full_method_name << '\'';
RemoveService(service);
return -1;
}
const std::string& svc_name = mappings[i].path.service_name;
if (svc_name.empty()) {
if (_global_restful_map == NULL) {
_global_restful_map = new RestfulMap("");
}
MethodProperty::OpaqueParams params;
params.is_tabbed = !!tabbed;
params.allow_default_url = svc_opt.allow_default_url;
params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
params.pb_single_repeated_to_array = svc_opt.pb_single_repeated_to_array;
if (!_global_restful_map->AddMethod(
mappings[i].path, service, params,
mappings[i].method_name, mp->status)) {
LOG(ERROR) << "Fail to map `" << mappings[i].path
<< "' to `" << full_method_name << '\'';
RemoveService(service);
return -1;
}
if (mp->http_url == NULL) {
mp->http_url = new std::string(mappings[i].path.to_string());
} else {
if (!mp->http_url->empty()) {
mp->http_url->append(" @");
}
mp->http_url->append(mappings[i].path.to_string());
}
continue;
}
ServiceProperty* sp = _fullname_service_map.seek(svc_name);
ServiceProperty* sp2 = _service_map.seek(svc_name);
if (((!!sp) != (!!sp2)) ||
(sp != NULL && sp->service != sp2->service)) {
LOG(ERROR) << "Impossible: _fullname_service and _service_map are"
" inconsistent before inserting " << svc_name;
RemoveService(service);
return -1;
}
RestfulMap* m = NULL;
if (sp == NULL) {
m = new RestfulMap(mappings[i].path.service_name);
} else {
m = sp->restful_map;
}
MethodProperty::OpaqueParams params;
params.is_tabbed = !!tabbed;
params.allow_default_url = svc_opt.allow_default_url;
params.allow_http_body_to_pb = svc_opt.allow_http_body_to_pb;
params.pb_bytes_to_base64 = svc_opt.pb_bytes_to_base64;
params.pb_single_repeated_to_array = svc_opt.pb_single_repeated_to_array;
if (!m->AddMethod(mappings[i].path, service, params,
mappings[i].method_name, mp->status)) {
LOG(ERROR) << "Fail to map `" << mappings[i].path << "' to `"
<< sd->full_name() << '.' << mappings[i].method_name
<< '\'';
if (sp == NULL) {
delete m;
}
RemoveService(service);
return -1;
}
if (mp->http_url == NULL) {
mp->http_url = new std::string(mappings[i].path.to_string());
} else {
if (!mp->http_url->empty()) {
mp->http_url->append(" @");
}
mp->http_url->append(mappings[i].path.to_string());
}
if (sp == NULL) {
ServiceProperty ss =
{ false, SERVER_DOESNT_OWN_SERVICE, NULL, m };
_fullname_service_map[svc_name] = ss;
_service_map[svc_name] = ss;
++_virtual_service_count;
}
}
}
if (tabbed) {
if (_tab_info_list == NULL) {
_tab_info_list = new TabInfoList;
}
const size_t last_size = _tab_info_list->size();
tabbed->GetTabInfo(_tab_info_list);
const size_t cur_size = _tab_info_list->size();
for (size_t i = last_size; i != cur_size; ++i) {
const TabInfo& info = (*_tab_info_list)[i];
if (!info.valid()) {
LOG(ERROR) << "Invalid TabInfo: path=" << info.path
<< " tab_name=" << info.tab_name;
_tab_info_list->resize(last_size);
RemoveService(service);
return -1;
}
}
}
return 0;
}
ServiceOptions::ServiceOptions()
: ownership(SERVER_DOESNT_OWN_SERVICE)
, allow_default_url(false)
, allow_http_body_to_pb(true)
#ifdef BAIDU_INTERNAL
, pb_bytes_to_base64(false)
#else
, pb_bytes_to_base64(true)
#endif
, pb_single_repeated_to_array(false)
{}
int Server::AddService(google::protobuf::Service* service,
ServiceOwnership ownership) {
ServiceOptions options;
options.ownership = ownership;
return AddServiceInternal(service, false, options);
}
int Server::AddService(google::protobuf::Service* service,
ServiceOwnership ownership,
const butil::StringPiece& restful_mappings,
bool allow_default_url) {
ServiceOptions options;
options.ownership = ownership;
// TODO: This is weird
options.restful_mappings = restful_mappings.as_string();
options.allow_default_url = allow_default_url;
return AddServiceInternal(service, false, options);
}
int Server::AddService(google::protobuf::Service* service,
const ServiceOptions& options) {
return AddServiceInternal(service, false, options);
}
int Server::AddBuiltinService(google::protobuf::Service* service) {
ServiceOptions options;
options.ownership = SERVER_OWNS_SERVICE;
return AddServiceInternal(service, true, options);
}
void Server::RemoveMethodsOf(google::protobuf::Service* service) {
const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();
const bool is_idl_support = sd->file()->options().GetExtension(idl_support);
std::string full_name_wo_ns;
for (int i = 0; i < sd->method_count(); ++i) {
const google::protobuf::MethodDescriptor* md = sd->method(i);
MethodProperty* mp = _method_map.seek(md->full_name());
if (is_idl_support) {
full_name_wo_ns.clear();
full_name_wo_ns.reserve(sd->name().size() + 1 + md->name().size());
full_name_wo_ns.append(sd->name());
full_name_wo_ns.push_back('.');
full_name_wo_ns.append(md->name());
_method_map.erase(full_name_wo_ns);
}
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << md->full_name();
continue;
}
if (mp->http_url) {
butil::StringSplitter at_sp(mp->http_url->c_str(), '@');
for (; at_sp; ++at_sp) {
butil::StringPiece path(at_sp.field(), at_sp.length());
path.trim_spaces();
butil::StringSplitter slash_sp(
path.data(), path.data() + path.size(), '/');
if (slash_sp == NULL) {
LOG(ERROR) << "Invalid http_url=" << *mp->http_url;
break;
}
butil::StringPiece v_svc_name(slash_sp.field(), slash_sp.length());
const ServiceProperty* vsp = FindServicePropertyByName(v_svc_name);
if (vsp == NULL) {
if (_global_restful_map) {
std::string path_str;
path.CopyToString(&path_str);
if (_global_restful_map->RemoveByPathString(path_str)) {
continue;
}
}
LOG(ERROR) << "Impossible: service=" << v_svc_name
<< " for restful_map does not exist";
break;
}
std::string path_str;
path.CopyToString(&path_str);
if (!vsp->restful_map->RemoveByPathString(path_str)) {
LOG(ERROR) << "Fail to find path=" << path
<< " in restful_map of service=" << v_svc_name;
}
}
delete mp->http_url;
}
if (mp->own_method_status) {
delete mp->status;
}
_method_map.erase(md->full_name());
}
}
int Server::RemoveService(google::protobuf::Service* service) {
if (NULL == service) {
LOG(ERROR) << "Parameter[service] is NULL";
return -1;
}
if (status() != READY) {
LOG(ERROR) << "Can't remove service="
<< service->GetDescriptor()->full_name() << " from Server["
<< version() << "] which is " << status_str(status());
return -1;
}
const google::protobuf::ServiceDescriptor* sd = service->GetDescriptor();
ServiceProperty* ss = _fullname_service_map.seek(sd->full_name());
if (ss == NULL) {
RPC_VLOG << "Fail to find service=" << sd->full_name().c_str();
return -1;
}
RemoveMethodsOf(service);
if (ss->ownership == SERVER_OWNS_SERVICE) {
delete ss->service;
}
const bool is_builtin_service = ss->is_builtin_service;
_fullname_service_map.erase(sd->full_name());
_service_map.erase(sd->name());
// Note: ss is invalidated.
if (is_builtin_service) {
--_builtin_service_count;
} else {
if (_first_service == service) {
_first_service = NULL;
}
}
return 0;
}
void Server::ClearServices() {
if (status() != READY) {
LOG_IF(ERROR, status() != UNINITIALIZED)
<< "Can't clear services from Server[" << version()
<< "] which is " << status_str(status());
return;
}
for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) {
if (it->second.ownership == SERVER_OWNS_SERVICE) {
delete it->second.service;
}
delete it->second.restful_map;
}
for (MethodMap::const_iterator it = _method_map.begin();
it != _method_map.end(); ++it) {
if (it->second.own_method_status) {
delete it->second.status;
}
delete it->second.http_url;
}
_fullname_service_map.clear();
_service_map.clear();
_method_map.clear();
_builtin_service_count = 0;
_virtual_service_count = 0;
_first_service = NULL;
}
google::protobuf::Service* Server::FindServiceByFullName(
const butil::StringPiece& full_name) const {
ServiceProperty* ss = _fullname_service_map.seek(full_name);
return (ss ? ss->service : NULL);
}
google::protobuf::Service* Server::FindServiceByName(
const butil::StringPiece& name) const {
ServiceProperty* ss = _service_map.seek(name);
return (ss ? ss->service : NULL);
}
void Server::GetStat(ServerStatistics* stat) const {
stat->connection_count = 0;
if (_am) {
stat->connection_count += _am->ConnectionCount();
}
if (_internal_am) {
stat->connection_count += _internal_am->ConnectionCount();
}
stat->user_service_count = service_count();
stat->builtin_service_count = builtin_service_count();
}
void Server::ListServices(std::vector<google::protobuf::Service*> *services) {
if (!services) {
return;
}
services->clear();
services->reserve(service_count());
for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) {
if (it->second.is_user_service()) {
services->push_back(it->second.service);
}
}
}
void Server::GenerateVersionIfNeeded() {
if (!_version.empty()) {
return;
}
int extra_count = !!_options.nshead_service + !!_options.rtmp_service +
!!_options.thrift_service + !!_options.redis_service;
_version.reserve((extra_count + service_count()) * 20);
for (ServiceMap::const_iterator it = _fullname_service_map.begin();
it != _fullname_service_map.end(); ++it) {
if (it->second.is_user_service()) {
if (!_version.empty()) {
_version.push_back('+');
}
_version.append(butil::class_name_str(*it->second.service));
}
}
if (_options.nshead_service) {
if (!_version.empty()) {
_version.push_back('+');
}
_version.append(butil::class_name_str(*_options.nshead_service));
}
#ifdef ENABLE_THRIFT_FRAMED_PROTOCOL
if (_options.thrift_service) {
if (!_version.empty()) {
_version.push_back('+');
}
_version.append(butil::class_name_str(*_options.thrift_service));
}
#endif
if (_options.rtmp_service) {
if (!_version.empty()) {
_version.push_back('+');
}
_version.append(butil::class_name_str(*_options.rtmp_service));
}
if (_options.redis_service) {
if (!_version.empty()) {
_version.push_back('+');
}
_version.append(butil::class_name_str(*_options.redis_service));
}
}
void Server::PutPidFileIfNeeded() {
if (_options.pid_file.empty()) {
return;
}
RPC_VLOG << "pid_file = " << _options.pid_file;
// Recursively create directory
for (size_t pos = _options.pid_file.find('/'); pos != std::string::npos;
pos = _options.pid_file.find('/', pos + 1)) {
std::string dir_name =_options.pid_file.substr(0, pos + 1);
int rc = mkdir(dir_name.c_str(),
S_IFDIR | S_IRUSR | S_IWUSR | S_IXUSR | S_IRGRP);
if (rc != 0 && errno != EEXIST
#if defined(OS_MACOSX)
&& errno != EISDIR
#endif
) {
PLOG(WARNING) << "Fail to create " << dir_name;
_options.pid_file.clear();
return;
}
}
int fd = open(_options.pid_file.c_str(), O_WRONLY | O_CREAT | O_TRUNC, 0666);
if (fd < 0) {
LOG(WARNING) << "Fail to open " << _options.pid_file;
_options.pid_file.clear();
return;
}
char buf[32];
int nw = snprintf(buf, sizeof(buf), "%lld", (long long)getpid());
CHECK_EQ(nw, write(fd, buf, nw));
CHECK_EQ(0, close(fd));
}
void Server::RunUntilAskedToQuit() {
while (!IsAskedToQuit()) {
bthread_usleep(1000000L);
}
Stop(0/*not used now*/);
Join();
}
void* thread_local_data() {
const Server::ThreadLocalOptions* tl_options =
static_cast<const Server::ThreadLocalOptions*>(bthread_get_assigned_data());
if (tl_options == NULL) { // not in server threads.
return NULL;
}
if (BAIDU_UNLIKELY(tl_options->thread_local_data_factory == NULL)) {
CHECK(false) << "The protocol impl. may not set tls correctly";
return NULL;
}
void* data = bthread_getspecific(tl_options->tls_key);
if (data == NULL) {
data = tl_options->thread_local_data_factory->CreateData();
if (data != NULL) {
CHECK_EQ(0, bthread_setspecific(tl_options->tls_key, data));
}
}
return data;
}
inline void tabs_li(std::ostream& os, const char* link,
const char* tab_name, const char* current_tab_name) {
os << "<li id='" << link << '\'';
if (strcmp(current_tab_name, tab_name) == 0) {
os << " class='current'";
}
os << '>' << tab_name << "</li>\n";
}
void Server::PrintTabsBody(std::ostream& os,
const char* current_tab_name) const {
os << "<ul class='tabs-menu'>\n";
if (_tab_info_list) {
for (size_t i = 0; i < _tab_info_list->size(); ++i) {
const TabInfo& info = (*_tab_info_list)[i];
tabs_li(os, info.path.c_str(), info.tab_name.c_str(),
current_tab_name);
}
}
os << "<li id='https://github.com/apache/brpc/blob/master/docs/cn/builtin_service.md' "
"class='help'>?</li>\n</ul>\n"
"<div style='height:40px;'></div>"; // placeholder
}
static pthread_mutex_t g_dummy_server_mutex = PTHREAD_MUTEX_INITIALIZER;
static Server* g_dummy_server = NULL;
int StartDummyServerAt(int port, ProfilerLinker) {
if (port < 0 || port >= 65536) {
LOG(ERROR) << "Invalid port=" << port;
return -1;
}
if (g_dummy_server == NULL) { // (1)
BAIDU_SCOPED_LOCK(g_dummy_server_mutex);
if (g_dummy_server == NULL) {
Server* dummy_server = new Server;
dummy_server->set_version(butil::string_printf(
"DummyServerOf(%s)", GetProgramName()));
ServerOptions options;
options.num_threads = 0;
if (dummy_server->Start(port, &options) != 0) {
LOG(ERROR) << "Fail to start dummy_server at port=" << port;
return -1;
}
// (1) may see uninitialized dummy_server due to relaxed memory
// fencing, but we only expose a function to test existence
// of g_dummy_server, everything should be fine.
g_dummy_server = dummy_server;
return 0;
}
}
LOG(ERROR) << "Already have dummy_server at port="
<< g_dummy_server->listen_address().port;
return -1;
}
bool IsDummyServerRunning() {
return g_dummy_server != NULL;
}
const Server::MethodProperty*
Server::FindMethodPropertyByFullName(const butil::StringPiece&fullname) const {
return _method_map.seek(fullname);
}
const Server::MethodProperty*
Server::FindMethodPropertyByFullName(const butil::StringPiece& service_name/*full*/,
const butil::StringPiece& method_name) const {
const size_t fullname_len = service_name.size() + 1 + method_name.size();
if (fullname_len <= 256) {
// Avoid allocation in most cases.
char buf[fullname_len];
memcpy(buf, service_name.data(), service_name.size());
buf[service_name.size()] = '.';
memcpy(buf + service_name.size() + 1, method_name.data(), method_name.size());
return FindMethodPropertyByFullName(butil::StringPiece(buf, fullname_len));
} else {
std::string full_method_name;
full_method_name.reserve(fullname_len);
full_method_name.append(service_name.data(), service_name.size());
full_method_name.push_back('.');
full_method_name.append(method_name.data(), method_name.size());
return FindMethodPropertyByFullName(full_method_name);
}
}
const Server::MethodProperty*
Server::FindMethodPropertyByNameAndIndex(const butil::StringPiece& service_name,
int method_index) const {
const Server::ServiceProperty* sp = FindServicePropertyByName(service_name);
if (NULL == sp) {
return NULL;
}
const google::protobuf::ServiceDescriptor* sd = sp->service->GetDescriptor();
if (method_index < 0 || method_index >= sd->method_count()) {
return NULL;
}
const google::protobuf::MethodDescriptor* method = sd->method(method_index);
return FindMethodPropertyByFullName(method->full_name());
}
const Server::ServiceProperty*
Server::FindServicePropertyByFullName(const butil::StringPiece& fullname) const {
return _fullname_service_map.seek(fullname);
}
const Server::ServiceProperty*
Server::FindServicePropertyByName(const butil::StringPiece& name) const {
return _service_map.seek(name);
}
int Server::AddCertificate(const CertInfo& cert) {
if (!_options.has_ssl_options()) {
LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
return -1;
}
std::string cert_key(cert.certificate);
cert_key.append(cert.private_key);
if (_ssl_ctx_map.seek(cert_key) != NULL) {
LOG(WARNING) << cert << " already exists";
return 0;
}
SSLContext ssl_ctx;
ssl_ctx.filters = cert.sni_filters;
ssl_ctx.ctx = std::make_shared<SocketSSLContext>();
SSL_CTX* raw_ctx = CreateServerSSLContext(cert.certificate, cert.private_key,
_options.ssl_options(), &ssl_ctx.filters);
if (raw_ctx == NULL) {
return -1;
}
ssl_ctx.ctx->raw_ctx = raw_ctx;
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
SSL_CTX_set_tlsext_servername_callback(ssl_ctx.ctx->raw_ctx, SSLSwitchCTXByHostname);
SSL_CTX_set_tlsext_servername_arg(ssl_ctx.ctx->raw_ctx, this);
#endif
if (!_reload_cert_maps.Modify(AddCertMapping, ssl_ctx)) {
LOG(ERROR) << "Fail to add mappings into _reload_cert_maps";
return -1;
}
_ssl_ctx_map[cert_key] = ssl_ctx;
return 0;
}
bool Server::AddCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
if (!bg.cert_map.initialized()
&& bg.cert_map.init(INITIAL_CERT_MAP) != 0) {
LOG(ERROR) << "Fail to init _cert_map";
return false;
}
if (!bg.wildcard_cert_map.initialized()
&& bg.wildcard_cert_map.init(INITIAL_CERT_MAP) != 0) {
LOG(ERROR) << "Fail to init _wildcard_cert_map";
return false;
}
for (size_t i = 0; i < ssl_ctx.filters.size(); ++i) {
const char* hostname = ssl_ctx.filters[i].c_str();
CertMap* cmap = NULL;
if (strncmp(hostname, "*.", 2) == 0) {
cmap = &(bg.wildcard_cert_map);
hostname += 2;
} else {
cmap = &(bg.cert_map);
}
if (cmap->seek(hostname) == NULL) {
cmap->insert(hostname, ssl_ctx.ctx);
} else {
LOG(WARNING) << "Duplicate certificate hostname=" << hostname;
}
}
return true;
}
int Server::RemoveCertificate(const CertInfo& cert) {
if (!_options.has_ssl_options()) {
LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
return -1;
}
std::string cert_key(cert.certificate);
cert_key.append(cert.private_key);
SSLContext* ssl_ctx = _ssl_ctx_map.seek(cert_key);
if (ssl_ctx == NULL) {
LOG(WARNING) << cert << " doesn't exist";
return 0;
}
if (ssl_ctx->ctx == _default_ssl_ctx) {
LOG(WARNING) << "Cannot remove: " << cert
<< " since it's the default certificate";
return -1;
}
if (!_reload_cert_maps.Modify(RemoveCertMapping, *ssl_ctx)) {
LOG(ERROR) << "Fail to remove mappings from _reload_cert_maps";
return -1;
}
_ssl_ctx_map.erase(cert_key);
return 0;
}
bool Server::RemoveCertMapping(CertMaps& bg, const SSLContext& ssl_ctx) {
for (size_t i = 0; i < ssl_ctx.filters.size(); ++i) {
const char* hostname = ssl_ctx.filters[i].c_str();
CertMap* cmap = NULL;
if (strncmp(hostname, "*.", 2) == 0) {
cmap = &(bg.wildcard_cert_map);
hostname += 2;
} else {
cmap = &(bg.cert_map);
}
std::shared_ptr<SocketSSLContext>* ctx = cmap->seek(hostname);
if (ctx != NULL && *ctx == ssl_ctx.ctx) {
cmap->erase(hostname);
}
}
return true;
}
int Server::ResetCertificates(const std::vector<CertInfo>& certs) {
if (!_options.has_ssl_options()) {
LOG(ERROR) << "ServerOptions.ssl_options is not configured yet";
return -1;
}
SSLContextMap tmp_map;
if (tmp_map.init(INITIAL_CERT_MAP) != 0) {
LOG(ERROR) << "Fail to initialize tmp_map";
return -1;
}
// Add default certificate into tmp_map first since it can't be reloaded
std::string default_cert_key =
_options.ssl_options().default_cert.certificate
+ _options.ssl_options().default_cert.private_key;
tmp_map[default_cert_key] = _ssl_ctx_map[default_cert_key];
for (size_t i = 0; i < certs.size(); ++i) {
std::string cert_key(certs[i].certificate);
cert_key.append(certs[i].private_key);
if (tmp_map.seek(cert_key) != NULL) {
LOG(WARNING) << certs[i] << " already exists";
return 0;
}
SSLContext ssl_ctx;
ssl_ctx.filters = certs[i].sni_filters;
ssl_ctx.ctx = std::make_shared<SocketSSLContext>();
ssl_ctx.ctx->raw_ctx = CreateServerSSLContext(
certs[i].certificate, certs[i].private_key,
_options.ssl_options(), &ssl_ctx.filters);
if (ssl_ctx.ctx->raw_ctx == NULL) {
return -1;
}
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
SSL_CTX_set_tlsext_servername_callback(ssl_ctx.ctx->raw_ctx, SSLSwitchCTXByHostname);
SSL_CTX_set_tlsext_servername_arg(ssl_ctx.ctx->raw_ctx, this);
#endif
tmp_map[cert_key] = ssl_ctx;
}
if (!_reload_cert_maps.Modify(ResetCertMappings, tmp_map)) {
return -1;
}
_ssl_ctx_map.swap(tmp_map);
return 0;
}
bool Server::ResetCertMappings(CertMaps& bg, const SSLContextMap& ctx_map) {
if (!bg.cert_map.initialized()
&& bg.cert_map.init(INITIAL_CERT_MAP) != 0) {
LOG(ERROR) << "Fail to init _cert_map";
return false;
}
if (!bg.wildcard_cert_map.initialized()
&& bg.wildcard_cert_map.init(INITIAL_CERT_MAP) != 0) {
LOG(ERROR) << "Fail to init _wildcard_cert_map";
return false;
}
bg.cert_map.clear();
bg.wildcard_cert_map.clear();
for (SSLContextMap::const_iterator it =
ctx_map.begin(); it != ctx_map.end(); ++it) {
const SSLContext& ssl_ctx = it->second;
for (size_t i = 0; i < ssl_ctx.filters.size(); ++i) {
const char* hostname = ssl_ctx.filters[i].c_str();
CertMap* cmap = NULL;
if (strncmp(hostname, "*.", 2) == 0) {
cmap = &(bg.wildcard_cert_map);
hostname += 2;
} else {
cmap = &(bg.cert_map);
}
if (cmap->seek(hostname) == NULL) {
cmap->insert(hostname, ssl_ctx.ctx);
} else {
LOG(WARNING) << "Duplicate certificate hostname=" << hostname;
}
}
}
return true;
}
void Server::FreeSSLContexts() {
_ssl_ctx_map.clear();
_reload_cert_maps.Modify(ClearCertMapping);
_default_ssl_ctx = NULL;
}
bool Server::ClearCertMapping(CertMaps& bg) {
bg.cert_map.clear();
bg.wildcard_cert_map.clear();
return true;
}
int Server::ResetMaxConcurrency(int max_concurrency) {
if (!IsRunning()) {
LOG(WARNING) << "ResetMaxConcurrency is only allowed for a Running Server";
return -1;
}
// Assume that modifying int32 is atomical in X86
_options.max_concurrency = max_concurrency;
return 0;
}
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(MethodProperty* mp) {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_max_concurrency_of_method;
}
if (mp->status == NULL) {
LOG(ERROR) << "method=" << mp->method->full_name()
<< " does not support max_concurrency";
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}
return mp->max_concurrency;
}
int Server::MaxConcurrencyOf(const MethodProperty* mp) const {
if (IsRunning()) {
LOG(WARNING) << "MaxConcurrencyOf is only allowd before Server started";
return g_default_max_concurrency_of_method;
}
if (mp == NULL || mp->status == NULL) {
return 0;
}
return mp->max_concurrency;
}
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) {
MethodProperty* mp = _method_map.seek(full_method_name);
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}
return MaxConcurrencyOf(mp);
}
int Server::MaxConcurrencyOf(const butil::StringPiece& full_method_name) const {
return MaxConcurrencyOf(_method_map.seek(full_method_name));
}
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) {
MethodProperty* mp = const_cast<MethodProperty*>(
FindMethodPropertyByFullName(full_service_name, method_name));
if (mp == NULL) {
LOG(ERROR) << "Fail to find method=" << full_service_name
<< '/' << method_name;
_failed_to_set_max_concurrency_of_method = true;
return g_default_max_concurrency_of_method;
}
return MaxConcurrencyOf(mp);
}
int Server::MaxConcurrencyOf(const butil::StringPiece& full_service_name,
const butil::StringPiece& method_name) const {
return MaxConcurrencyOf(FindMethodPropertyByFullName(
full_service_name, method_name));
}
AdaptiveMaxConcurrency& Server::MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) {
return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
}
int Server::MaxConcurrencyOf(google::protobuf::Service* service,
const butil::StringPiece& method_name) const {
return MaxConcurrencyOf(service->GetDescriptor()->full_name(), method_name);
}
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
int Server::SSLSwitchCTXByHostname(struct ssl_st* ssl,
int* al, Server* server) {
(void)al;
const char* hostname = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name);
bool strict_sni = server->_options.ssl_options().strict_sni;
if (hostname == NULL) {
return strict_sni ? SSL_TLSEXT_ERR_ALERT_FATAL : SSL_TLSEXT_ERR_NOACK;
}
butil::DoublyBufferedData<CertMaps>::ScopedPtr s;
if (server->_reload_cert_maps.Read(&s) != 0) {
return SSL_TLSEXT_ERR_ALERT_FATAL;
}
std::shared_ptr<SocketSSLContext>* pctx = s->cert_map.seek(hostname);
if (pctx == NULL) {
const char* dot = hostname;
for (; *dot != '\0'; ++dot) {
if (*dot == '.') {
++dot;
break;
}
}
if (*dot != '\0') {
pctx = s->wildcard_cert_map.seek(dot);
}
}
if (pctx == NULL) {
if (strict_sni) {
return SSL_TLSEXT_ERR_ALERT_FATAL;
}
// Use default SSL_CTX which is the current one
return SSL_TLSEXT_ERR_OK;
}
// Switch SSL_CTX to the one with correct hostname
SSL_set_SSL_CTX(ssl, (*pctx)->raw_ctx);
return SSL_TLSEXT_ERR_OK;
}
#endif // SSL_CTRL_SET_TLSEXT_HOSTNAME
} // namespace brpc