| // Copyright (c) 2014 Baidu, Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // Authors: Ge,Jun (gejun@baidu.com) |
| |
| #include <openssl/ssl.h> |
| #include <gflags/gflags.h> |
| #include <fcntl.h> // O_RDONLY |
| #include <signal.h> |
| |
| #include "butil/build_config.h" // OS_LINUX |
| // Naming services |
| #ifdef BAIDU_INTERNAL |
| #include "brpc/policy/baidu_naming_service.h" |
| #endif |
| #include "brpc/policy/file_naming_service.h" |
| #include "brpc/policy/list_naming_service.h" |
| #include "brpc/policy/domain_naming_service.h" |
| #include "brpc/policy/remote_file_naming_service.h" |
| |
| // Load Balancers |
| #include "brpc/policy/round_robin_load_balancer.h" |
| #include "brpc/policy/randomized_load_balancer.h" |
| #include "brpc/policy/locality_aware_load_balancer.h" |
| #include "brpc/policy/consistent_hashing_load_balancer.h" |
| #include "brpc/policy/hasher.h" |
| #include "brpc/policy/dynpart_load_balancer.h" |
| |
| // Compress handlers |
| #include "brpc/compress.h" |
| #include "brpc/policy/gzip_compress.h" |
| #include "brpc/policy/snappy_compress.h" |
| |
| // Protocols |
| #include "brpc/protocol.h" |
| #include "brpc/policy/baidu_rpc_protocol.h" |
| #include "brpc/policy/http_rpc_protocol.h" |
| #include "brpc/policy/hulu_pbrpc_protocol.h" |
| #include "brpc/policy/nova_pbrpc_protocol.h" |
| #include "brpc/policy/public_pbrpc_protocol.h" |
| #include "brpc/policy/ubrpc2pb_protocol.h" |
| #include "brpc/policy/sofa_pbrpc_protocol.h" |
| #include "brpc/policy/memcache_binary_protocol.h" |
| #include "brpc/policy/streaming_rpc_protocol.h" |
| #include "brpc/policy/mongo_protocol.h" |
| #include "brpc/policy/redis_protocol.h" |
| #include "brpc/policy/nshead_mcpack_protocol.h" |
| #include "brpc/policy/rtmp_protocol.h" |
| #include "brpc/policy/esp_protocol.h" |
| |
| #include "brpc/input_messenger.h" // get_or_new_client_side_messenger |
| #include "brpc/socket_map.h" // SocketMapList |
| #include "brpc/server.h" |
| #include "brpc/trackme.h" // TrackMe |
| #include "brpc/details/usercode_backup_pool.h" |
| #if defined(OS_LINUX) |
| #include <malloc.h> // malloc_trim |
| #endif |
| #include "butil/fd_guard.h" |
| #include "butil/files/file_watcher.h" |
| |
| extern "C" { |
| // defined in gperftools/malloc_extension_c.h |
| void BAIDU_WEAK MallocExtension_ReleaseFreeMemory(void); |
| } |
| |
| namespace brpc { |
| |
| DECLARE_bool(usercode_in_pthread); |
| |
| DEFINE_int32(free_memory_to_system_interval, 0, |
| "Try to return free memory to system every so many seconds, " |
| "values <= 0 disables this feature"); |
| BRPC_VALIDATE_GFLAG(free_memory_to_system_interval, PassValidate); |
| |
| namespace policy { |
| // Defined in http_rpc_protocol.cpp |
| void InitCommonStrings(); |
| } |
| |
| using namespace policy; |
| |
| const char* const DUMMY_SERVER_PORT_FILE = "dummy_server.port"; |
| |
| struct GlobalExtensions { |
| GlobalExtensions() |
| : ch_mh_lb(MurmurHash32) |
| , ch_md5_lb(MD5Hash32){} |
| #ifdef BAIDU_INTERNAL |
| BaiduNamingService bns; |
| #endif |
| FileNamingService fns; |
| ListNamingService lns; |
| DomainNamingService dns; |
| RemoteFileNamingService rfns; |
| |
| RoundRobinLoadBalancer rr_lb; |
| RandomizedLoadBalancer randomized_lb; |
| LocalityAwareLoadBalancer la_lb; |
| ConsistentHashingLoadBalancer ch_mh_lb; |
| ConsistentHashingLoadBalancer ch_md5_lb; |
| DynPartLoadBalancer dynpart_lb; |
| }; |
| |
| static pthread_once_t register_extensions_once = PTHREAD_ONCE_INIT; |
| static GlobalExtensions* g_ext = NULL; |
| |
| static long ReadPortOfDummyServer(const char* filename) { |
| butil::fd_guard fd(open(filename, O_RDONLY)); |
| if (fd < 0) { |
| LOG(ERROR) << "Fail to open `" << DUMMY_SERVER_PORT_FILE << "'"; |
| return -1; |
| } |
| char port_str[32]; |
| const ssize_t nr = read(fd, port_str, sizeof(port_str)); |
| if (nr <= 0) { |
| LOG(ERROR) << "Fail to read `" << DUMMY_SERVER_PORT_FILE << "': " |
| << (nr == 0 ? "nothing to read" : berror()); |
| return -1; |
| } |
| port_str[std::min((size_t)nr, sizeof(port_str)-1)] = '\0'; |
| const char* p = port_str; |
| for (; isspace(*p); ++p) {} |
| char* endptr = NULL; |
| const long port = strtol(p, &endptr, 10); |
| for (; isspace(*endptr); ++endptr) {} |
| if (*endptr != '\0') { |
| LOG(ERROR) << "Invalid port=`" << port_str << "'"; |
| return -1; |
| } |
| return port; |
| } |
| |
| // Expose counters of butil::IOBuf |
| static int64_t GetIOBufBlockCount(void*) { |
| return butil::IOBuf::block_count(); |
| } |
| static int64_t GetIOBufBlockCountHitTLSThreshold(void*) { |
| return butil::IOBuf::block_count_hit_tls_threshold(); |
| } |
| static int64_t GetIOBufNewBigViewCount(void*) { |
| return butil::IOBuf::new_bigview_count(); |
| } |
| static int64_t GetIOBufBlockMemory(void*) { |
| return butil::IOBuf::block_memory(); |
| } |
| |
| // Defined in server.cpp |
| extern butil::static_atomic<int> g_running_server_count; |
| static int GetRunningServerCount(void*) { |
| return g_running_server_count.load(butil::memory_order_relaxed); |
| } |
| |
| // Update global stuff periodically. |
| static void* GlobalUpdate(void*) { |
| // Expose variables. |
| bvar::PassiveStatus<int64_t> var_iobuf_block_count( |
| "iobuf_block_count", GetIOBufBlockCount, NULL); |
| bvar::PassiveStatus<int64_t> var_iobuf_block_count_hit_tls_threshold( |
| "iobuf_block_count_hit_tls_threshold", |
| GetIOBufBlockCountHitTLSThreshold, NULL); |
| bvar::PassiveStatus<int64_t> var_iobuf_new_bigview_count( |
| GetIOBufNewBigViewCount, NULL); |
| bvar::PerSecond<bvar::PassiveStatus<int64_t> > var_iobuf_new_bigview_second( |
| "iobuf_newbigview_second", &var_iobuf_new_bigview_count); |
| bvar::PassiveStatus<int64_t> var_iobuf_block_memory( |
| "iobuf_block_memory", GetIOBufBlockMemory, NULL); |
| bvar::PassiveStatus<int> var_running_server_count( |
| "rpc_server_count", GetRunningServerCount, NULL); |
| |
| butil::FileWatcher fw; |
| if (fw.init_from_not_exist(DUMMY_SERVER_PORT_FILE) < 0) { |
| LOG(FATAL) << "Fail to init FileWatcher on `" << DUMMY_SERVER_PORT_FILE << "'"; |
| return NULL; |
| } |
| |
| std::vector<SocketId> conns; |
| const int64_t start_time_us = butil::gettimeofday_us(); |
| const int WARN_NOSLEEP_THRESHOLD = 2; |
| int64_t last_time_us = start_time_us; |
| int consecutive_nosleep = 0; |
| int64_t last_return_free_memory_time = start_time_us; |
| while (1) { |
| const int64_t sleep_us = 1000000L + last_time_us - butil::gettimeofday_us(); |
| if (sleep_us > 0) { |
| if (bthread_usleep(sleep_us) < 0) { |
| PLOG_IF(FATAL, errno != ESTOP) << "Fail to sleep"; |
| break; |
| } |
| consecutive_nosleep = 0; |
| } else { |
| if (++consecutive_nosleep >= WARN_NOSLEEP_THRESHOLD) { |
| consecutive_nosleep = 0; |
| LOG(WARNING) << __FUNCTION__ << " is too busy!"; |
| } |
| } |
| last_time_us = butil::gettimeofday_us(); |
| |
| TrackMe(); |
| |
| if (!IsDummyServerRunning() |
| && g_running_server_count.load(butil::memory_order_relaxed) == 0 |
| && fw.check_and_consume() > 0) { |
| long port = ReadPortOfDummyServer(DUMMY_SERVER_PORT_FILE); |
| if (port >= 0) { |
| StartDummyServerAt(port); |
| } |
| } |
| |
| SocketMapList(&conns); |
| const int64_t now_ms = butil::cpuwide_time_ms(); |
| for (size_t i = 0; i < conns.size(); ++i) { |
| SocketUniquePtr ptr; |
| if (Socket::Address(conns[i], &ptr) == 0) { |
| ptr->UpdateStatsEverySecond(now_ms); |
| } |
| } |
| |
| const int return_mem_interval = |
| FLAGS_free_memory_to_system_interval/*reloadable*/; |
| if (return_mem_interval > 0 && |
| last_time_us >= last_return_free_memory_time + |
| return_mem_interval * 1000000L) { |
| last_return_free_memory_time = last_time_us; |
| // TODO: Calling MallocExtension::instance()->ReleaseFreeMemory may |
| // crash the program in later calls to malloc, verified on tcmalloc |
| // 1.7 and 2.5, which means making the static member function weak |
| // in details/tcmalloc_extension.cpp is probably not correct, however |
| // it does work for heap profilers. |
| if (MallocExtension_ReleaseFreeMemory != NULL) { |
| MallocExtension_ReleaseFreeMemory(); |
| } else { |
| #if defined(OS_LINUX) |
| // GNU specific. |
| malloc_trim(10 * 1024 * 1024/*leave 10M pad*/); |
| #endif |
| } |
| } |
| } |
| return NULL; |
| } |
| |
| static void BaiduStreamingLogHandler(google::protobuf::LogLevel level, |
| const char* filename, int line, |
| const std::string& message) { |
| switch (level) { |
| case google::protobuf::LOGLEVEL_INFO: |
| LOG(INFO) << filename << ':' << line << ' ' << message; |
| return; |
| case google::protobuf::LOGLEVEL_WARNING: |
| LOG(WARNING) << filename << ':' << line << ' ' << message; |
| return; |
| case google::protobuf::LOGLEVEL_ERROR: |
| LOG(ERROR) << filename << ':' << line << ' ' << message; |
| return; |
| case google::protobuf::LOGLEVEL_FATAL: |
| LOG(FATAL) << filename << ':' << line << ' ' << message; |
| return; |
| } |
| CHECK(false) << filename << ':' << line << ' ' << message; |
| } |
| |
| static void GlobalInitializeOrDieImpl() { |
| ////////////////////////////////////////////////////////////////// |
| // Be careful about usages of gflags inside this function which // |
| // may be called before main() only seeing gflags with default // |
| // values even if the gflags will be set after main(). // |
| ////////////////////////////////////////////////////////////////// |
| |
| // Ignore SIGPIPE. |
| struct sigaction oldact; |
| if (sigaction(SIGPIPE, NULL, &oldact) != 0 || |
| (oldact.sa_handler == NULL && oldact.sa_sigaction == NULL)) { |
| CHECK(NULL == signal(SIGPIPE, SIG_IGN)); |
| } |
| |
| // Make GOOGLE_LOG print to comlog device |
| SetLogHandler(&BaiduStreamingLogHandler); |
| |
| // Setting the variable here does not work, the profiler probably check |
| // the variable before main() for only once. |
| // setenv("TCMALLOC_SAMPLE_PARAMETER", "524288", 0); |
| |
| // Initialize openssl library |
| SSL_library_init(); |
| SSL_load_error_strings(); |
| if (SSLThreadInit() != 0 || SSLDHInit() != 0) { |
| exit(1); |
| } |
| |
| // Defined in http_rpc_protocol.cpp |
| InitCommonStrings(); |
| |
| // Leave memory of these extensions to process's clean up. |
| g_ext = new(std::nothrow) GlobalExtensions(); |
| if (NULL == g_ext) { |
| exit(1); |
| } |
| // Naming Services |
| #ifdef BAIDU_INTERNAL |
| NamingServiceExtension()->RegisterOrDie("bns", &g_ext->bns); |
| #endif |
| NamingServiceExtension()->RegisterOrDie("file", &g_ext->fns); |
| NamingServiceExtension()->RegisterOrDie("list", &g_ext->lns); |
| NamingServiceExtension()->RegisterOrDie("http", &g_ext->dns); |
| NamingServiceExtension()->RegisterOrDie("remotefile", &g_ext->rfns); |
| |
| // Load Balancers |
| LoadBalancerExtension()->RegisterOrDie("rr", &g_ext->rr_lb); |
| LoadBalancerExtension()->RegisterOrDie("random", &g_ext->randomized_lb); |
| LoadBalancerExtension()->RegisterOrDie("la", &g_ext->la_lb); |
| LoadBalancerExtension()->RegisterOrDie("c_murmurhash", &g_ext->ch_mh_lb); |
| LoadBalancerExtension()->RegisterOrDie("c_md5", &g_ext->ch_md5_lb); |
| LoadBalancerExtension()->RegisterOrDie("_dynpart", &g_ext->dynpart_lb); |
| |
| // Compress Handlers |
| const CompressHandler gzip_compress = |
| { GzipCompress, GzipDecompress, "gzip" }; |
| if (RegisterCompressHandler(COMPRESS_TYPE_GZIP, gzip_compress) != 0) { |
| exit(1); |
| } |
| const CompressHandler zlib_compress = |
| { ZlibCompress, ZlibDecompress, "zlib" }; |
| if (RegisterCompressHandler(COMPRESS_TYPE_ZLIB, zlib_compress) != 0) { |
| exit(1); |
| } |
| const CompressHandler snappy_compress = |
| { SnappyCompress, SnappyDecompress, "snappy" }; |
| if (RegisterCompressHandler(COMPRESS_TYPE_SNAPPY, snappy_compress) != 0) { |
| exit(1); |
| } |
| |
| // Protocols |
| Protocol baidu_protocol = { ParseRpcMessage, |
| SerializeRequestDefault, PackRpcRequest, |
| ProcessRpcRequest, ProcessRpcResponse, |
| VerifyRpcRequest, NULL, NULL, |
| CONNECTION_TYPE_ALL, "baidu_std" }; |
| if (RegisterProtocol(PROTOCOL_BAIDU_STD, baidu_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol streaming_protocol = { ParseStreamingMessage, |
| NULL, NULL, ProcessStreamingMessage, |
| ProcessStreamingMessage, |
| NULL, NULL, NULL, |
| CONNECTION_TYPE_SINGLE, "streaming_rpc" }; |
| |
| if (RegisterProtocol(PROTOCOL_STREAMING_RPC, streaming_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol http_protocol = { ParseHttpMessage, |
| SerializeHttpRequest, PackHttpRequest, |
| ProcessHttpRequest, ProcessHttpResponse, |
| VerifyHttpRequest, ParseHttpServerAddress, |
| GetHttpMethodName, |
| CONNECTION_TYPE_POOLED_AND_SHORT, |
| "http" }; |
| if (RegisterProtocol(PROTOCOL_HTTP, http_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol hulu_protocol = { ParseHuluMessage, |
| SerializeRequestDefault, PackHuluRequest, |
| ProcessHuluRequest, ProcessHuluResponse, |
| VerifyHuluRequest, NULL, NULL, |
| CONNECTION_TYPE_ALL, "hulu_pbrpc" }; |
| if (RegisterProtocol(PROTOCOL_HULU_PBRPC, hulu_protocol) != 0) { |
| exit(1); |
| } |
| |
| // Only valid at client side |
| Protocol nova_protocol = { ParseNsheadMessage, |
| SerializeNovaRequest, PackNovaRequest, |
| NULL, ProcessNovaResponse, |
| NULL, NULL, NULL, |
| CONNECTION_TYPE_POOLED_AND_SHORT, "nova_pbrpc" }; |
| if (RegisterProtocol(PROTOCOL_NOVA_PBRPC, nova_protocol) != 0) { |
| exit(1); |
| } |
| |
| // Only valid at client side |
| Protocol public_pbrpc_protocol = { ParseNsheadMessage, |
| SerializePublicPbrpcRequest, |
| PackPublicPbrpcRequest, |
| NULL, ProcessPublicPbrpcResponse, |
| NULL, NULL, NULL, |
| // public_pbrpc server implementation |
| // doesn't support full duplex |
| CONNECTION_TYPE_POOLED_AND_SHORT, |
| "public_pbrpc" }; |
| if (RegisterProtocol(PROTOCOL_PUBLIC_PBRPC, public_pbrpc_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol sofa_protocol = { ParseSofaMessage, |
| SerializeRequestDefault, PackSofaRequest, |
| ProcessSofaRequest, ProcessSofaResponse, |
| VerifySofaRequest, NULL, NULL, |
| CONNECTION_TYPE_ALL, "sofa_pbrpc" }; |
| if (RegisterProtocol(PROTOCOL_SOFA_PBRPC, sofa_protocol) != 0) { |
| exit(1); |
| } |
| |
| // Only valid at server side. We generalize all the protocols that |
| // prefixes with nshead as `nshead_protocol' and specify the content |
| // parsing after nshead by ServerOptions.nshead_service. |
| Protocol nshead_protocol = { ParseNsheadMessage, |
| SerializeNsheadRequest, PackNsheadRequest, |
| ProcessNsheadRequest, ProcessNsheadResponse, |
| VerifyNsheadRequest, NULL, NULL, |
| CONNECTION_TYPE_POOLED_AND_SHORT, "nshead" }; |
| if (RegisterProtocol(PROTOCOL_NSHEAD, nshead_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol mc_binary_protocol = { ParseMemcacheMessage, |
| SerializeMemcacheRequest, |
| PackMemcacheRequest, |
| NULL, ProcessMemcacheResponse, |
| NULL, NULL, GetMemcacheMethodName, |
| CONNECTION_TYPE_ALL, "memcache" }; |
| if (RegisterProtocol(PROTOCOL_MEMCACHE, mc_binary_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol redis_protocol = { ParseRedisMessage, |
| SerializeRedisRequest, |
| PackRedisRequest, |
| NULL, ProcessRedisResponse, |
| NULL, NULL, GetRedisMethodName, |
| CONNECTION_TYPE_ALL, "redis" }; |
| if (RegisterProtocol(PROTOCOL_REDIS, redis_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol mongo_protocol = { ParseMongoMessage, |
| NULL, NULL, |
| ProcessMongoRequest, NULL, |
| NULL, NULL, NULL, |
| CONNECTION_TYPE_POOLED, "mongo" }; |
| if (RegisterProtocol(PROTOCOL_MONGO, mongo_protocol) != 0) { |
| exit(1); |
| } |
| |
| // Only valid at client side |
| Protocol ubrpc_compack_protocol = { |
| ParseNsheadMessage, |
| SerializeUbrpcCompackRequest, PackUbrpcRequest, |
| NULL, ProcessUbrpcResponse, |
| NULL, NULL, NULL, |
| CONNECTION_TYPE_POOLED_AND_SHORT, "ubrpc_compack" }; |
| if (RegisterProtocol(PROTOCOL_UBRPC_COMPACK, ubrpc_compack_protocol) != 0) { |
| exit(1); |
| } |
| Protocol ubrpc_mcpack2_protocol = { |
| ParseNsheadMessage, |
| SerializeUbrpcMcpack2Request, PackUbrpcRequest, |
| NULL, ProcessUbrpcResponse, |
| NULL, NULL, NULL, |
| CONNECTION_TYPE_POOLED_AND_SHORT, "ubrpc_mcpack2" }; |
| if (RegisterProtocol(PROTOCOL_UBRPC_MCPACK2, ubrpc_mcpack2_protocol) != 0) { |
| exit(1); |
| } |
| |
| // Only valid at client side |
| Protocol nshead_mcpack_protocol = { |
| ParseNsheadMessage, |
| SerializeNsheadMcpackRequest, PackNsheadMcpackRequest, |
| NULL, ProcessNsheadMcpackResponse, |
| NULL, NULL, NULL, |
| CONNECTION_TYPE_POOLED_AND_SHORT, "nshead_mcpack" }; |
| if (RegisterProtocol(PROTOCOL_NSHEAD_MCPACK, nshead_mcpack_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol rtmp_protocol = { |
| ParseRtmpMessage, |
| SerializeRtmpRequest, PackRtmpRequest, |
| ProcessRtmpMessage, ProcessRtmpMessage, |
| NULL, NULL, NULL, |
| (ConnectionType)(CONNECTION_TYPE_SINGLE|CONNECTION_TYPE_SHORT), |
| "rtmp" }; |
| if (RegisterProtocol(PROTOCOL_RTMP, rtmp_protocol) != 0) { |
| exit(1); |
| } |
| |
| Protocol esp_protocol = { |
| ParseEspMessage, |
| SerializeEspRequest, PackEspRequest, |
| NULL, ProcessEspResponse, |
| NULL, NULL, NULL, |
| CONNECTION_TYPE_POOLED_AND_SHORT, "esp"}; |
| if (RegisterProtocol(PROTOCOL_ESP, esp_protocol) != 0) { |
| exit(1); |
| } |
| |
| std::vector<Protocol> protocols; |
| ListProtocols(&protocols); |
| for (size_t i = 0; i < protocols.size(); ++i) { |
| if (protocols[i].process_response) { |
| InputMessageHandler handler; |
| // `process_response' is required at client side |
| handler.parse = protocols[i].parse; |
| handler.process = protocols[i].process_response; |
| // No need to verify at client side |
| handler.verify = NULL; |
| handler.arg = NULL; |
| handler.name = protocols[i].name; |
| if (get_or_new_client_side_messenger()->AddHandler(handler) != 0) { |
| exit(1); |
| } |
| } |
| } |
| |
| if (FLAGS_usercode_in_pthread) { |
| // Optional. If channel/server are initialized before main(), this |
| // flag may be false at here even if it will be set to true after |
| // main(). In which case, the usercode pool will not be initialized |
| // until the pool is used. |
| InitUserCodeBackupPoolOnceOrDie(); |
| } |
| |
| // We never join GlobalUpdate, let it quit with the process. |
| bthread_t th; |
| CHECK(bthread_start_background(&th, NULL, GlobalUpdate, NULL) == 0) |
| << "Fail to start GlobalUpdate"; |
| } |
| |
| void GlobalInitializeOrDie() { |
| if (pthread_once(®ister_extensions_once, |
| GlobalInitializeOrDieImpl) != 0) { |
| LOG(FATAL) << "Fail to pthread_once"; |
| exit(1); |
| } |
| } |
| |
| } // namespace brpc |