| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| |
| #include <signal.h> |
| #include <openssl/md5.h> |
| #include <google/protobuf/descriptor.h> |
| #include <gflags/gflags.h> |
| #include "bthread/bthread.h" |
| #include "butil/build_config.h" // OS_MACOSX |
| #include "butil/string_printf.h" |
| #include "butil/logging.h" |
| #include "butil/time.h" |
| #include "bthread/bthread.h" |
| #include "bthread/unstable.h" |
| #include "bvar/bvar.h" |
| #include "brpc/socket.h" |
| #include "brpc/socket_map.h" |
| #include "brpc/channel.h" |
| #include "brpc/load_balancer.h" |
| #include "brpc/closure_guard.h" |
| #include "brpc/details/controller_private_accessor.h" |
| #include "brpc/controller.h" |
| #include "brpc/span.h" |
| #include "brpc/server.h" // Server::_session_local_data_pool |
| #include "brpc/simple_data_pool.h" |
| #include "brpc/retry_policy.h" |
| #include "brpc/stream_impl.h" |
| #include "brpc/policy/streaming_rpc_protocol.h" // FIXME |
| #include "brpc/rpc_dump.h" |
| #include "brpc/details/usercode_backup_pool.h" // RunUserCode |
| #include "brpc/mongo_service_adaptor.h" |
| |
| // Force linking the .o in UT (which analysis deps by inclusions) |
| #include "brpc/parallel_channel.h" |
| #include "brpc/selective_channel.h" |
| #include "bthread/task_group.h" |
| |
| namespace bthread { |
| extern BAIDU_THREAD_LOCAL TaskGroup* tls_task_group; |
| } |
| |
| // This is the only place that both client/server must link, so we put |
| // registrations of errno here. |
| BAIDU_REGISTER_ERRNO(brpc::ENOSERVICE, "No such service"); |
| BAIDU_REGISTER_ERRNO(brpc::ENOMETHOD, "No such method"); |
| BAIDU_REGISTER_ERRNO(brpc::EREQUEST, "Bad request"); |
| BAIDU_REGISTER_ERRNO(brpc::ERPCAUTH, "Authentication failed"); |
| BAIDU_REGISTER_ERRNO(brpc::ETOOMANYFAILS, "Too many sub channels failed"); |
| BAIDU_REGISTER_ERRNO(brpc::EPCHANFINISH, "ParallelChannel finished"); |
| BAIDU_REGISTER_ERRNO(brpc::EBACKUPREQUEST, "Sending backup request"); |
| BAIDU_REGISTER_ERRNO(brpc::ERPCTIMEDOUT, "RPC call is timed out"); |
| BAIDU_REGISTER_ERRNO(brpc::EFAILEDSOCKET, "Broken socket"); |
| BAIDU_REGISTER_ERRNO(brpc::EHTTP, "Bad http call"); |
| BAIDU_REGISTER_ERRNO(brpc::EOVERCROWDED, "The server is overcrowded"); |
| BAIDU_REGISTER_ERRNO(brpc::ERTMPPUBLISHABLE, "RtmpRetryingClientStream is publishable"); |
| BAIDU_REGISTER_ERRNO(brpc::ERTMPCREATESTREAM, "createStream was rejected by the RTMP server"); |
| BAIDU_REGISTER_ERRNO(brpc::EEOF, "Got EOF"); |
| BAIDU_REGISTER_ERRNO(brpc::EUNUSED, "The socket was not needed"); |
| BAIDU_REGISTER_ERRNO(brpc::ESSL, "SSL related operation failed"); |
| BAIDU_REGISTER_ERRNO(brpc::EH2RUNOUTSTREAMS, "The H2 socket was run out of streams"); |
| |
| BAIDU_REGISTER_ERRNO(brpc::EINTERNAL, "General internal error"); |
| BAIDU_REGISTER_ERRNO(brpc::ERESPONSE, "Bad response"); |
| BAIDU_REGISTER_ERRNO(brpc::ELOGOFF, "Server is stopping"); |
| BAIDU_REGISTER_ERRNO(brpc::ELIMIT, "Reached server's max_concurrency"); |
| BAIDU_REGISTER_ERRNO(brpc::ECLOSE, "Close socket initiatively"); |
| BAIDU_REGISTER_ERRNO(brpc::EITP, "Bad Itp response"); |
| |
| #if BRPC_WITH_RDMA |
| BAIDU_REGISTER_ERRNO(brpc::ERDMA, "RDMA verbs error"); |
| BAIDU_REGISTER_ERRNO(brpc::ERDMAMEM, "Memory not registered for RDMA"); |
| #endif |
| |
| DECLARE_bool(log_as_json); |
| |
| namespace brpc { |
| |
| DEFINE_bool(graceful_quit_on_sigterm, false, |
| "Register SIGTERM handle func to quit graceful"); |
| DEFINE_bool(graceful_quit_on_sighup, false, |
| "Register SIGHUP handle func to quit graceful"); |
| |
| const IdlNames idl_single_req_single_res = { "req", "res" }; |
| const IdlNames idl_single_req_multi_res = { "req", "" }; |
| const IdlNames idl_multi_req_single_res = { "", "res" }; |
| const IdlNames idl_multi_req_multi_res = { "", "" }; |
| |
| extern const int64_t IDL_VOID_RESULT = 12345678987654321LL; |
| |
| // For definitely false branch in src/brpc/profiler_link.h |
| int PROFILER_LINKER_DUMMY = 0; |
| |
| static void PrintRevision(std::ostream& os, void*) { |
| #if defined(BRPC_REVISION) |
| os << BRPC_REVISION; |
| #else |
| os << "undefined"; |
| #endif |
| } |
| static bvar::PassiveStatus<std::string> s_rpc_revision( |
| "rpc_revision", PrintRevision, NULL); |
| |
| static const int RETRY_AVOIDANCE = 8; |
| |
| // Defined in parallel_channel.cpp |
| void DestroyParallelChannelDone(google::protobuf::Closure* c); |
| const Controller* GetSubControllerOfParallelChannel( |
| const google::protobuf::Closure* done, int index); |
| const Controller* GetSubControllerOfSelectiveChannel( |
| const RPCSender* sender, int index); |
| |
| DECLARE_bool(usercode_in_pthread); |
| DECLARE_bool(usercode_in_coroutine); |
| static const int MAX_RETRY_COUNT = 1000; |
| static bvar::Adder<int64_t>* g_ncontroller = NULL; |
| |
| static pthread_once_t s_create_vars_once = PTHREAD_ONCE_INIT; |
| |
| static void CreateVars() { |
| g_ncontroller = new bvar::Adder<int64_t>("rpc_controller_count"); |
| } |
| |
| Controller::Controller() { |
| CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars)); |
| *g_ncontroller << 1; |
| ResetPods(); |
| } |
| |
| Controller::Controller(const Inheritable& parent_ctx) { |
| CHECK_EQ(0, pthread_once(&s_create_vars_once, CreateVars)); |
| *g_ncontroller << 1; |
| ResetPods(); |
| _inheritable = parent_ctx; |
| } |
| |
| struct SessionKVFlusher { |
| Controller* cntl; |
| }; |
| static std::ostream& operator<<(std::ostream& os, const SessionKVFlusher& f) { |
| f.cntl->FlushSessionKV(os); |
| return os; |
| } |
| |
| Controller::~Controller() { |
| *g_ncontroller << -1; |
| if (_session_kv != nullptr && _session_kv->Count() != 0) { |
| LOG(INFO) << SessionKVFlusher{ this }; |
| } |
| ResetNonPods(); |
| } |
| |
| class IgnoreAllRead : public ProgressiveReader { |
| public: |
| // @ProgressiveReader |
| butil::Status OnReadOnePart(const void* /*data*/, size_t /*length*/) { |
| return butil::Status::OK(); |
| } |
| void OnEndOfMessage(const butil::Status&) {} |
| }; |
| |
| static IgnoreAllRead* s_ignore_all_read = NULL; |
| static pthread_once_t s_ignore_all_read_once = PTHREAD_ONCE_INIT; |
| static void CreateIgnoreAllRead() { s_ignore_all_read = new IgnoreAllRead; } |
| |
| // If resource needs to be destroyed or memory needs to be deleted (both |
| // directly and indirectly referenced), do them in this method. Notice that |
| // you don't have to set the fields to initial state after deletion since |
| // they'll be set uniformly after this method is called. |
| void Controller::ResetNonPods() { |
| if (_span) { |
| Span::Submit(_span, butil::cpuwide_time_us()); |
| } |
| _error_text.clear(); |
| _remote_side = butil::EndPoint(); |
| _local_side = butil::EndPoint(); |
| if (_session_local_data) { |
| _server->_session_local_data_pool->Return(_session_local_data); |
| } |
| _mongo_session_data.reset(); |
| delete _sampled_request; |
| |
| if (!is_used_by_rpc() && _correlation_id != INVALID_BTHREAD_ID) { |
| CHECK_NE(EPERM, bthread_id_cancel(_correlation_id)); |
| } |
| if (_oncancel_id != INVALID_BTHREAD_ID) { |
| bthread_id_error(_oncancel_id, 0); |
| } |
| if (_pchan_sub_count > 0) { |
| DestroyParallelChannelDone(_done); |
| } |
| delete _sender; |
| _lb.reset(NULL); |
| _current_call.Reset(); |
| ExcludedServers::Destroy(_accessed); |
| _request_buf.clear(); |
| delete _http_request; |
| delete _http_response; |
| delete _request_user_fields; |
| delete _response_user_fields; |
| _request_attachment.clear(); |
| _response_attachment.clear(); |
| if (_wpa) { |
| _wpa->MarkRPCAsDone(Failed()); |
| _wpa.reset(NULL); |
| } |
| if (_rpa != NULL) { |
| if (!has_progressive_reader()) { |
| // Never called ReadProgressiveAttachmentBy (successfully), the data |
| // is probably being buffered and a full buffer may block parse |
| // handler of the protocol. We need to set a reader to consume |
| // the buffer. |
| pthread_once(&s_ignore_all_read_once, CreateIgnoreAllRead); |
| _rpa->ReadProgressiveAttachmentBy(s_ignore_all_read); |
| } |
| _rpa.reset(NULL); |
| } |
| delete _remote_stream_settings; |
| _thrift_method_name.clear(); |
| _after_rpc_resp_fn = nullptr; |
| |
| CHECK(_unfinished_call == NULL); |
| } |
| |
| void Controller::ResetPods() { |
| // NOTE: Make the sequence of assignments same with the order that they're |
| // defined in header. Better for cpu cache and faster for lookup. |
| _span = NULL; |
| _flags = 0; |
| #ifndef BAIDU_INTERNAL |
| set_pb_bytes_to_base64(true); |
| #endif |
| _error_code = 0; |
| _session_local_data = NULL; |
| _server = NULL; |
| _oncancel_id = INVALID_BTHREAD_ID; |
| _auth_context = NULL; |
| _sampled_request = NULL; |
| _request_protocol = PROTOCOL_UNKNOWN; |
| _max_retry = UNSET_MAGIC_NUM; |
| _retry_policy = NULL; |
| _correlation_id = INVALID_BTHREAD_ID; |
| _connection_type = CONNECTION_TYPE_UNKNOWN; |
| _timeout_ms = UNSET_MAGIC_NUM; |
| _backup_request_ms = UNSET_MAGIC_NUM; |
| _connect_timeout_ms = UNSET_MAGIC_NUM; |
| _real_timeout_ms = UNSET_MAGIC_NUM; |
| _deadline_us = -1; |
| _timeout_id = 0; |
| _begin_time_us = 0; |
| _end_time_us = 0; |
| _tos = 0; |
| _preferred_index = -1; |
| _request_compress_type = COMPRESS_TYPE_NONE; |
| _response_compress_type = COMPRESS_TYPE_NONE; |
| _fail_limit = UNSET_MAGIC_NUM; |
| _pipelined_count = 0; |
| _inheritable.Reset(); |
| _pchan_sub_count = 0; |
| _response = NULL; |
| _done = NULL; |
| _sender = NULL; |
| _request_code = 0; |
| _single_server_id = INVALID_SOCKET_ID; |
| _unfinished_call = NULL; |
| _stream_creator = NULL; |
| _accessed = NULL; |
| _pack_request = NULL; |
| _method = NULL; |
| _auth = NULL; |
| _idl_names = idl_single_req_single_res; |
| _idl_result = IDL_VOID_RESULT; |
| _http_request = NULL; |
| _http_response = NULL; |
| _request_user_fields = NULL; |
| _response_user_fields = NULL; |
| _request_stream = INVALID_STREAM_ID; |
| _response_stream = INVALID_STREAM_ID; |
| _remote_stream_settings = NULL; |
| _auth_flags = 0; |
| } |
| |
| Controller::Call::Call(Controller::Call* rhs) |
| : nretry(rhs->nretry) |
| , need_feedback(rhs->need_feedback) |
| , enable_circuit_breaker(rhs->enable_circuit_breaker) |
| , peer_id(rhs->peer_id) |
| , begin_time_us(rhs->begin_time_us) |
| , sending_sock(rhs->sending_sock.release()) |
| , stream_user_data(rhs->stream_user_data) { |
| // NOTE: fields in rhs should be reset because RPC could fail before |
| // setting all the fields to next call and _current_call.OnComplete |
| // will behave incorrectly. |
| rhs->need_feedback = false; |
| rhs->peer_id = INVALID_SOCKET_ID; |
| rhs->stream_user_data = NULL; |
| } |
| |
| Controller::Call::~Call() { |
| CHECK(sending_sock.get() == NULL); |
| } |
| |
| void Controller::Call::Reset() { |
| nretry = 0; |
| need_feedback = false; |
| enable_circuit_breaker = false; |
| peer_id = INVALID_SOCKET_ID; |
| begin_time_us = 0; |
| sending_sock.reset(NULL); |
| stream_user_data = NULL; |
| } |
| |
| void Controller::set_timeout_ms(int64_t timeout_ms) { |
| if (timeout_ms <= 0x7fffffff) { |
| _timeout_ms = timeout_ms; |
| _real_timeout_ms = timeout_ms; |
| } else { |
| _timeout_ms = 0x7fffffff; |
| LOG(WARNING) << "timeout_ms is limited to 0x7fffffff (roughly 24 days)"; |
| } |
| } |
| |
| void Controller::set_backup_request_ms(int64_t timeout_ms) { |
| if (timeout_ms <= 0x7fffffff) { |
| _backup_request_ms = timeout_ms; |
| } else { |
| _backup_request_ms = 0x7fffffff; |
| LOG(WARNING) << "backup_request_ms is limited to 0x7fffffff (roughly 24 days)"; |
| } |
| } |
| |
| void Controller::set_max_retry(int max_retry) { |
| if (max_retry > MAX_RETRY_COUNT) { |
| LOG(WARNING) << "Retry count can't be larger than " |
| << MAX_RETRY_COUNT << ", round it to " |
| << MAX_RETRY_COUNT; |
| _max_retry = MAX_RETRY_COUNT; |
| } else { |
| _max_retry = max_retry; |
| } |
| } |
| |
| void Controller::set_log_id(uint64_t log_id) { |
| add_flag(FLAGS_LOG_ID); |
| _inheritable.log_id = log_id; |
| } |
| |
| |
| bool Controller::Failed() const { |
| return FailedInline(); |
| } |
| |
| std::string Controller::ErrorText() const { |
| return _error_text; |
| } |
| |
| void StartCancel(CallId id) { |
| bthread_id_error(id, ECANCELED); |
| } |
| |
| void Controller::StartCancel() { |
| LOG(FATAL) << "You must call brpc::StartCancel(id) instead!" |
| " because this function is racing with ~Controller() in " |
| " asynchronous calls."; |
| } |
| |
| static const char HEX_ALPHA[] = "0123456789ABCDEF"; |
| void Controller::AppendServerIdentiy() { |
| if (_server == NULL) { |
| return; |
| } |
| if (is_security_mode()) { |
| _error_text.reserve(_error_text.size() + MD5_DIGEST_LENGTH * 2 + 2); |
| _error_text.push_back('['); |
| char ipbuf[64]; |
| int len = snprintf(ipbuf, sizeof(ipbuf), "%s:%d", |
| butil::my_ip_cstr(), _server->listen_address().port); |
| unsigned char digest[MD5_DIGEST_LENGTH]; |
| MD5((const unsigned char*)ipbuf, len, digest); |
| for (size_t i = 0; i < sizeof(digest); ++i) { |
| _error_text.push_back(HEX_ALPHA[digest[i] & 0xF]); |
| _error_text.push_back(HEX_ALPHA[digest[i] >> 4]); |
| } |
| _error_text.push_back(']'); |
| } else { |
| butil::string_appendf(&_error_text, "[%s:%d]", |
| butil::my_ip_cstr(), _server->listen_address().port); |
| } |
| } |
| |
| inline void UpdateResponseHeader(Controller* cntl) { |
| DCHECK(cntl->Failed()); |
| if (cntl->request_protocol() == PROTOCOL_HTTP || |
| cntl->request_protocol() == PROTOCOL_H2) { |
| if (cntl->ErrorCode() != EHTTP) { |
| // Set the related status code |
| cntl->http_response().set_status_code( |
| ErrorCodeToStatusCode(cntl->ErrorCode())); |
| } // else assume that status code is already set along with EHTTP. |
| if (cntl->server() != NULL) { |
| // Override HTTP body at server-side to conduct error text |
| // to the client. |
| // The client-side should preserve body which may be a piece |
| // of useable data rather than error text. |
| cntl->response_attachment().clear(); |
| cntl->response_attachment().append(cntl->ErrorText()); |
| } |
| } |
| } |
| |
| void Controller::SetFailed(const std::string& reason) { |
| _error_code = -1; |
| if (!_error_text.empty()) { |
| _error_text.push_back(' '); |
| } |
| if (_current_call.nretry != 0) { |
| butil::string_appendf(&_error_text, "[R%d]", _current_call.nretry); |
| } else { |
| AppendServerIdentiy(); |
| } |
| _error_text.append(reason); |
| if (_span) { |
| _span->set_error_code(_error_code); |
| _span->Annotate(reason); |
| } |
| UpdateResponseHeader(this); |
| } |
| |
| void Controller::SetFailed(int error_code, const char* reason_fmt, ...) { |
| if (error_code == 0) { |
| CHECK(false) << "error_code is 0"; |
| error_code = -1; |
| } |
| _error_code = error_code; |
| if (!_error_text.empty()) { |
| _error_text.push_back(' '); |
| } |
| if (_current_call.nretry != 0) { |
| butil::string_appendf(&_error_text, "[R%d]", _current_call.nretry); |
| } else { |
| AppendServerIdentiy(); |
| } |
| const size_t old_size = _error_text.size(); |
| if (_error_code != -1) { |
| butil::string_appendf(&_error_text, "[E%d]", _error_code); |
| } |
| va_list ap; |
| va_start(ap, reason_fmt); |
| butil::string_vappendf(&_error_text, reason_fmt, ap); |
| va_end(ap); |
| if (_span) { |
| _span->set_error_code(_error_code); |
| _span->AnnotateCStr(_error_text.c_str() + old_size, 0); |
| } |
| UpdateResponseHeader(this); |
| } |
| |
| void Controller::CloseConnection(const char* reason_fmt, ...) { |
| if (_error_code == 0) { |
| _error_code = ECLOSE; |
| } |
| add_flag(FLAGS_CLOSE_CONNECTION); |
| if (!_error_text.empty()) { |
| _error_text.push_back(' '); |
| } |
| if (_current_call.nretry != 0) { |
| butil::string_appendf(&_error_text, "[R%d]", _current_call.nretry); |
| } else { |
| AppendServerIdentiy(); |
| } |
| const size_t old_size = _error_text.size(); |
| if (_error_code != -1) { |
| butil::string_appendf(&_error_text, "[E%d]", _error_code); |
| } |
| va_list ap; |
| va_start(ap, reason_fmt); |
| butil::string_vappendf(&_error_text, reason_fmt, ap); |
| va_end(ap); |
| if (_span) { |
| _span->set_error_code(_error_code); |
| _span->AnnotateCStr(_error_text.c_str() + old_size, 0); |
| } |
| UpdateResponseHeader(this); |
| } |
| |
| bool Controller::IsCanceled() const { |
| SocketUniquePtr sock; |
| return (Socket::Address(_current_call.peer_id, &sock) != 0); |
| } |
| |
| class RunOnCancelThread { |
| public: |
| RunOnCancelThread(google::protobuf::Closure* cb, bthread_id_t id) |
| : _cb(cb), _id(id) {} |
| |
| static void* RunThis(void* arg) { |
| ((RunOnCancelThread*)arg)->Run(); |
| return NULL; |
| } |
| |
| void Run() { |
| _cb->Run(); |
| CHECK_EQ(0, bthread_id_unlock_and_destroy(_id)); |
| delete this; |
| } |
| |
| private: |
| google::protobuf::Closure* _cb; |
| bthread_id_t _id; |
| }; |
| |
| int Controller::RunOnCancel(bthread_id_t id, void* data, int error_code) { |
| if (error_code == 0) { |
| // Called from Controller::ResetNonPods upon Controller's Reset or |
| // destruction, we just call the callback in-place. |
| static_cast<google::protobuf::Closure*>(data)->Run(); |
| CHECK_EQ(0, bthread_id_unlock_and_destroy(id)); |
| return 0; |
| } |
| // Called from Socket::SetFailed, should be infrequent. |
| // To make sure Socket::SetFailed is never blocked, we run the callback |
| // in a new thread. |
| RunOnCancelThread* arg = new RunOnCancelThread( |
| static_cast<google::protobuf::Closure*>(data), id); |
| bthread_t th; |
| CHECK_EQ(0, bthread_start_urgent(&th, NULL, RunOnCancelThread::RunThis, arg)); |
| return 0; |
| } |
| |
| void Controller::NotifyOnCancel(google::protobuf::Closure* callback) { |
| if (NULL == callback) { |
| LOG(WARNING) << "Parameter `callback' is NLLL"; |
| return; |
| } |
| |
| ClosureGuard guard(callback); |
| if (_oncancel_id != INVALID_BTHREAD_ID) { |
| LOG(FATAL) << "NotifyCancel a single call more than once!"; |
| return; |
| } |
| if (bthread_id_create(&_oncancel_id, callback, RunOnCancel) != 0) { |
| PLOG(FATAL) << "Fail to create bthread_id"; |
| return; |
| } |
| SocketUniquePtr sock; |
| if (Socket::Address(_current_call.peer_id, &sock) != 0) { |
| // Connection already broken |
| return; |
| } |
| sock->NotifyOnFailed(_oncancel_id); // Always succeed |
| guard.release(); |
| } |
| |
| void Join(CallId id) { |
| bthread_id_join(id); |
| } |
| |
| void JoinResponse(CallId id) { |
| bthread_id_join(id); |
| } |
| |
| static void HandleTimeout(void* arg) { |
| bthread_id_t correlation_id = { (uint64_t)arg }; |
| bthread_id_error(correlation_id, ERPCTIMEDOUT); |
| } |
| |
| void Controller::OnVersionedRPCReturned(const CompletionInfo& info, |
| bool new_bthread, int saved_error) { |
| // TODO(gejun): Simplify call-ending code. |
| // Intercept previous calls |
| while (info.id != _correlation_id && info.id != current_id()) { |
| if (_unfinished_call && get_id(_unfinished_call->nretry) == info.id) { |
| if (!FailedInline()) { |
| // Continue with successful backup request. |
| break; |
| } |
| // Complete failed backup request. |
| _unfinished_call->OnComplete(this, _error_code, info.responded, false); |
| delete _unfinished_call; |
| _unfinished_call = NULL; |
| } |
| // Ignore all non-backup requests and failed backup requests. |
| _error_code = saved_error; |
| response_attachment().clear(); |
| CHECK_EQ(0, bthread_id_unlock(info.id)); |
| return; |
| } |
| |
| if ((!_error_code && _retry_policy == NULL) || |
| _current_call.nretry >= _max_retry) { |
| goto END_OF_RPC; |
| } |
| if (_error_code == EBACKUPREQUEST) { |
| // Reset timeout if needed |
| int rc = 0; |
| if (timeout_ms() >= 0) { |
| rc = bthread_timer_add( |
| &_timeout_id, |
| butil::microseconds_to_timespec(_deadline_us), |
| HandleTimeout, (void*)_correlation_id.value); |
| } |
| if (rc != 0) { |
| SetFailed(rc, "Fail to add timer"); |
| goto END_OF_RPC; |
| } |
| if (!SingleServer()) { |
| if (_accessed == NULL) { |
| _accessed = ExcludedServers::Create( |
| std::min(_max_retry, RETRY_AVOIDANCE)); |
| if (NULL == _accessed) { |
| SetFailed(ENOMEM, "Fail to create ExcludedServers"); |
| goto END_OF_RPC; |
| } |
| } |
| _accessed->Add(_current_call.peer_id); |
| } |
| // _current_call does not end yet. |
| CHECK(_unfinished_call == NULL); // only one backup request now. |
| _unfinished_call = new (std::nothrow) Call(&_current_call); |
| if (_unfinished_call == NULL) { |
| SetFailed(ENOMEM, "Fail to new Call"); |
| goto END_OF_RPC; |
| } |
| ++_current_call.nretry; |
| add_flag(FLAGS_BACKUP_REQUEST); |
| return IssueRPC(butil::gettimeofday_us()); |
| } else { |
| auto retry_policy = _retry_policy ? _retry_policy : DefaultRetryPolicy(); |
| if (retry_policy->DoRetry(this)) { |
| // The error must come from _current_call because: |
| // * we intercepted error from _unfinished_call in OnVersionedRPCReturned |
| // * ERPCTIMEDOUT/ECANCELED are not retrying error by default. |
| CHECK_EQ(current_id(), info.id) << "error_code=" << _error_code; |
| if (!SingleServer()) { |
| if (_accessed == NULL) { |
| _accessed = ExcludedServers::Create( |
| std::min(_max_retry, RETRY_AVOIDANCE)); |
| if (NULL == _accessed) { |
| SetFailed(ENOMEM, "Fail to create ExcludedServers"); |
| goto END_OF_RPC; |
| } |
| } |
| _accessed->Add(_current_call.peer_id); |
| } |
| _current_call.OnComplete(this, _error_code, info.responded, false); |
| ++_current_call.nretry; |
| // Clear http responses before retrying, otherwise the response may |
| // be mixed with older (and undefined) stuff. This is actually not |
| // done before r32008. |
| if (_http_response) { |
| _http_response->Clear(); |
| } |
| response_attachment().clear(); |
| |
| // Retry backoff. |
| bthread::TaskGroup* g = bthread::tls_task_group; |
| int64_t backoff_time_us = retry_policy->GetBackoffTimeMs(this) * 1000L; |
| if (backoff_time_us > 0 && |
| backoff_time_us < _deadline_us - butil::gettimeofday_us()) { |
| // No need to do retry backoff when the backoff time is longer than the remaining rpc time. |
| if (retry_policy->CanRetryBackoffInPthread() || |
| (g && !g->is_current_pthread_task())) { |
| bthread_usleep(backoff_time_us); |
| } else { |
| LOG(WARNING) << "`CanRetryBackoffInPthread()' returns false, " |
| "skip retry backoff in pthread."; |
| } |
| } |
| return IssueRPC(butil::gettimeofday_us()); |
| } |
| } |
| |
| END_OF_RPC: |
| if (new_bthread && !FLAGS_usercode_in_coroutine) { |
| // [ Essential for -usercode_in_pthread=true ] |
| // When -usercode_in_pthread is on, the reserved threads (set by |
| // -usercode_backup_threads) may all block on bthread_id_lock in |
| // ProcessXXXResponse(), until the id is unlocked or destroyed which |
| // is run in a new thread when new_bthread is true. However since all |
| // workers are blocked, the created bthread will never be scheduled |
| // and result in deadlock. |
| // Make the id unlockable before creating the bthread fixes the issue. |
| // When -usercode_in_pthread is false, this also removes some useless |
| // waiting of the bthreads processing responses. |
| |
| // Note[_done]: callid is destroyed after _done which possibly takes |
| // a lot of time, stop useless locking |
| |
| // Note[cid]: When the callid needs to be destroyed in done->Run(), |
| // it does not mean that it will be destroyed directly in done->Run(), |
| // conversely the callid may still be locked/unlocked for many times |
| // before destroying. E.g. in slective channel, the callid is referenced |
| // by multiple sub-done and only destroyed by the last one. Calling |
| // bthread_id_about_to_destroy right here which makes the id unlockable |
| // anymore, is wrong. On the other hand, the combo channles setting |
| // FLAGS_DESTROY_CID_IN_DONE to true must be aware of |
| // -usercode_in_pthread and avoid deadlock by their own (TBR) |
| |
| if ((FLAGS_usercode_in_pthread || _done != NULL/*Note[_done]*/) && |
| !has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) { |
| bthread_id_about_to_destroy(info.id); |
| } |
| // No need to join this bthread since RPC caller won't wake up |
| // (or user's done won't be called) until this bthread finishes |
| bthread_t bt; |
| bthread_attr_t attr = (FLAGS_usercode_in_pthread ? |
| BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL); |
| _tmp_completion_info = info; |
| if (bthread_start_background(&bt, &attr, RunEndRPC, this) != 0) { |
| LOG(FATAL) << "Fail to start bthread"; |
| EndRPC(info); |
| } |
| } else { |
| if (_done != NULL/*Note[_done]*/ && |
| !has_flag(FLAGS_DESTROY_CID_IN_DONE)/*Note[cid]*/) { |
| bthread_id_about_to_destroy(info.id); |
| } |
| EndRPC(info); |
| } |
| } |
| |
| void* Controller::RunEndRPC(void* arg) { |
| Controller* c = static_cast<Controller*>(arg); |
| c->EndRPC(c->_tmp_completion_info); |
| return NULL; |
| } |
| |
| inline bool does_error_affect_main_socket(int error_code) { |
| // Errors tested in this function are reported by pooled connections |
| // and very likely to indicate that the server-side is down and the socket |
| // should be health-checked. |
| return error_code == ECONNREFUSED || |
| error_code == ENETUNREACH || |
| error_code == EHOSTUNREACH || |
| error_code == EINVAL/*returned by connect "0.0.0.1"*/; |
| } |
| |
| //Note: A RPC call is probably consisted by several individual Calls such as |
| // retries and backup requests. This method simply cares about the error of |
| // this very Call (specified by |error_code|) rather than the error of the |
| // entire RPC (specified by c->FailedInline()). |
| void Controller::Call::OnComplete( |
| Controller* c, int error_code/*note*/, bool responded, bool end_of_rpc) { |
| if (stream_user_data) { |
| stream_user_data->DestroyStreamUserData(sending_sock, c, error_code, end_of_rpc); |
| stream_user_data = NULL; |
| } |
| |
| if (sending_sock != NULL) { |
| if (error_code != 0) { |
| sending_sock->AddRecentError(); |
| } |
| |
| if (enable_circuit_breaker) { |
| sending_sock->FeedbackCircuitBreaker(error_code, |
| butil::gettimeofday_us() - begin_time_us); |
| } |
| } |
| |
| switch (c->connection_type()) { |
| case CONNECTION_TYPE_UNKNOWN: |
| break; |
| case CONNECTION_TYPE_SINGLE: |
| // Set main socket to be failed for connection refusal of streams. |
| // "single" streams are often maintained in a separate SocketMap and |
| // different from the main socket as well. |
| if (c->_stream_creator != NULL && |
| does_error_affect_main_socket(error_code) && |
| (sending_sock == NULL || sending_sock->id() != peer_id)) { |
| Socket::SetFailed(peer_id); |
| } |
| break; |
| case CONNECTION_TYPE_POOLED: |
| // NOTE: Not reuse pooled connection if this call fails and no response |
| // has been received through this connection |
| // Otherwise in-flight responses may come back in future and break the |
| // assumption that one pooled connection cannot have more than one |
| // message at the same time. |
| if (sending_sock != NULL && (error_code == 0 || responded)) { |
| if (!sending_sock->is_read_progressive()) { |
| // Normally-read socket which will not be used after RPC ends, |
| // safe to return. Notice that Socket::is_read_progressive may |
| // differ from Controller::is_response_read_progressively() |
| // because RPC possibly ends before setting up the socket. |
| sending_sock->ReturnToPool(); |
| } else { |
| // Progressively-read socket. Should be returned when the read |
| // ends. The method handles the details. |
| sending_sock->OnProgressiveReadCompleted(); |
| } |
| break; |
| } |
| // fall through |
| case CONNECTION_TYPE_SHORT: |
| if (sending_sock != NULL) { |
| // Check the comment in CONNECTION_TYPE_POOLED branch. |
| if (!sending_sock->is_read_progressive()) { |
| if (c->_stream_creator == NULL) { |
| sending_sock->SetFailed(); |
| } |
| } else { |
| sending_sock->OnProgressiveReadCompleted(); |
| } |
| } |
| if (does_error_affect_main_socket(error_code)) { |
| // main socket should die as well. |
| // NOTE: main socket may be wrongly set failed (provided that |
| // short/pooled socket does not hold a ref of the main socket). |
| // E.g. an in-parallel RPC sets the peer_id to be failed |
| // -> this RPC meets ECONNREFUSED |
| // -> main socket gets revived from HC |
| // -> this RPC sets main socket to be failed again. |
| Socket::SetFailed(peer_id); |
| } |
| break; |
| } |
| |
| if (ELOGOFF == error_code) { |
| SocketUniquePtr sock; |
| if (Socket::Address(peer_id, &sock) == 0) { |
| // Block this `Socket' while not closing the fd |
| sock->SetLogOff(); |
| } |
| } |
| |
| if (need_feedback) { |
| const LoadBalancer::CallInfo info = |
| { begin_time_us, peer_id, error_code, c }; |
| c->_lb->Feedback(info); |
| } |
| |
| // Release the `Socket' we used to send/receive data |
| sending_sock.reset(NULL); |
| } |
| |
| void Controller::EndRPC(const CompletionInfo& info) { |
| if (_timeout_id != 0) { |
| bthread_timer_del(_timeout_id); |
| _timeout_id = 0; |
| } |
| |
| // End _current_call and _unfinished_call. |
| if (info.id == current_id() || info.id == _correlation_id) { |
| if (_current_call.sending_sock != NULL) { |
| _remote_side = _current_call.sending_sock->remote_side(); |
| _local_side = _current_call.sending_sock->local_side(); |
| } |
| |
| if (_unfinished_call != NULL) { |
| // When _current_call is successful, mark _unfinished_call as |
| // EBACKUPREQUEST, we can't use 0 because the server possibly |
| // never respond, we can't use ERPCTIMEDOUT because _current_call |
| // is sent after _unfinished_call which is not necessarily timedout |
| // When _current_call is error, mark _unfinished_call with the |
| // same error. This is not accurate as well, but we have to end |
| // _unfinished_call with some sort of error anyway. |
| const int err = (_error_code == 0 ? EBACKUPREQUEST : _error_code); |
| _unfinished_call->OnComplete(this, err, false, false); |
| delete _unfinished_call; |
| _unfinished_call = NULL; |
| } |
| // TODO: Replace this with stream_creator. |
| HandleStreamConnection(_current_call.sending_sock.get()); |
| _current_call.OnComplete(this, _error_code, info.responded, true); |
| } else { |
| // Even if _unfinished_call succeeded, we don't use EBACKUPREQUEST |
| // (which gets punished in LALB) for _current_call because _current_call |
| // is sent after _unfinished_call, it's just normal that _current_call |
| // does not respond before _unfinished_call. |
| if (_unfinished_call == NULL) { |
| CHECK(false) << "A previous non-backup request responded, cid=" |
| << info.id << " current_cid=" << current_id() |
| << " initial_cid=" << _correlation_id |
| << " stream_user_data=" << _current_call.stream_user_data |
| << " sending_sock=" << _current_call.sending_sock.get(); |
| } |
| _current_call.OnComplete(this, ECANCELED, false, false); |
| if (_unfinished_call != NULL) { |
| if (_unfinished_call->sending_sock != NULL) { |
| _remote_side = _unfinished_call->sending_sock->remote_side(); |
| _local_side = _unfinished_call->sending_sock->local_side(); |
| } |
| // TODO: Replace this with stream_creator. |
| HandleStreamConnection(_unfinished_call->sending_sock.get()); |
| if (get_id(_unfinished_call->nretry) == info.id) { |
| _unfinished_call->OnComplete( |
| this, _error_code, info.responded, true); |
| } else { |
| CHECK(false) << "A previous non-backup request responded"; |
| _unfinished_call->OnComplete(this, ECANCELED, false, true); |
| } |
| |
| delete _unfinished_call; |
| _unfinished_call = NULL; |
| } |
| } |
| if (_stream_creator) { |
| _stream_creator->DestroyStreamCreator(this); |
| _stream_creator = NULL; |
| } |
| // Clear _error_text when the call succeeded, otherwise a successful |
| // call with non-empty ErrorText may confuse user. |
| if (!_error_code) { |
| _error_text.clear(); |
| } |
| // RPC finished, now it's safe to release `LoadBalancerWithNaming' |
| _lb.reset(); |
| if (_span) { |
| _span->set_ending_cid(info.id); |
| _span->set_async(_done); |
| // Submit the span if we're in async RPC. For sync RPC, the span |
| // is submitted after Join() to get a more accurate resuming timestamp. |
| if (_done) { |
| SubmitSpan(); |
| } |
| } |
| |
| // No need to retry or can't retry, just call user's `done'. |
| const CallId saved_cid = _correlation_id; |
| if (_done) { |
| if (!FLAGS_usercode_in_pthread || _done == DoNothing()/*Note*/) { |
| // Note: no need to run DoNothing in backup thread when pthread |
| // mode is on. Otherwise there's a tricky deadlock: |
| // void SomeService::CallMethod(...) { // -usercode_in_pthread=true |
| // ... |
| // channel.CallMethod(...., brpc::DoNothing()); |
| // brpc::Join(cntl.call_id()); |
| // ... |
| // } |
| // Join is not signalled when the done does not Run() and the done |
| // can't Run() because all backup threads are blocked by Join(). |
| |
| OnRPCEnd(butil::gettimeofday_us()); |
| const bool destroy_cid_in_done = has_flag(FLAGS_DESTROY_CID_IN_DONE); |
| _done->Run(); |
| // NOTE: Don't touch this Controller anymore, because it's likely to be |
| // deleted by done. |
| if (!destroy_cid_in_done) { |
| // Make this thread not scheduling itself when launching new |
| // bthreads, saving signalings. |
| // FIXME: We're assuming the calling thread is about to quit. |
| bthread_about_to_quit(); |
| CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid)); |
| } |
| } else { |
| RunUserCode(RunDoneInBackupThread, this); |
| } |
| } else { |
| // OnRPCEnd for sync RPC is called in Channel::CallMethod to count in |
| // latency of the context-switch. |
| |
| // Check comments in above branch on bthread_about_to_quit. |
| bthread_about_to_quit(); |
| CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid)); |
| } |
| } |
| void Controller::RunDoneInBackupThread(void* arg) { |
| static_cast<Controller*>(arg)->DoneInBackupThread(); |
| } |
| |
| void Controller::DoneInBackupThread() { |
| // OnRPCEnd for sync RPC is called in Channel::CallMethod to count in |
| // latency of the context-switch. |
| OnRPCEnd(butil::gettimeofday_us()); |
| const CallId saved_cid = _correlation_id; |
| const bool destroy_cid_in_done = has_flag(FLAGS_DESTROY_CID_IN_DONE); |
| _done->Run(); |
| // NOTE: Don't touch fields of controller anymore, it may be deleted. |
| if (!destroy_cid_in_done) { |
| CHECK_EQ(0, bthread_id_unlock_and_destroy(saved_cid)); |
| } |
| } |
| |
| void Controller::SubmitSpan() { |
| const int64_t now = butil::cpuwide_time_us(); |
| _span->set_start_callback_us(now); |
| if (_span->local_parent()) { |
| _span->local_parent()->AsParent(); |
| } |
| Span::Submit(_span, now); |
| _span = NULL; |
| } |
| |
| void Controller::HandleSendFailed() { |
| if (!FailedInline()) { |
| SetFailed("Must be SetFailed() before calling HandleSendFailed()"); |
| LOG(FATAL) << ErrorText(); |
| } |
| const CompletionInfo info = { current_id(), false }; |
| // NOTE: Launch new thread to run the callback in an asynchronous call |
| // (and done is not allowed to run in-place) |
| // Users may hold a lock before asynchronus CallMethod returns and |
| // grab the same lock inside done->Run(). If done->Run() is called in the |
| // same stack of CallMethod, the code is deadlocked. |
| // We don't need to run the callback in new thread in a sync call since |
| // the created thread needs to be joined anyway before end of CallMethod. |
| const bool new_bthread = (_done != NULL && !is_done_allowed_to_run_in_place()); |
| OnVersionedRPCReturned(info, new_bthread, _error_code); |
| } |
| |
| void Controller::IssueRPC(int64_t start_realtime_us) { |
| _current_call.begin_time_us = start_realtime_us; |
| |
| // If has retry/backup request,we will recalculate the timeout, |
| if (_real_timeout_ms > 0) { |
| _real_timeout_ms -= (start_realtime_us - _begin_time_us) / 1000; |
| } |
| |
| // Clear last error, Don't clear _error_text because we append to it. |
| _error_code = 0; |
| |
| // Make versioned correlation_id. |
| // call_id : unversioned, mainly for ECANCELED and ERPCTIMEDOUT |
| // call_id + 1 : first try. |
| // call_id + 2 : retry 1 |
| // ... |
| // call_id + N + 1 : retry N |
| // All ids except call_id are versioned. Say if we've sent retry 1 and |
| // a failed response of first try comes back, it will be ignored. |
| const CallId cid = current_id(); |
| |
| // Intercept IssueRPC when _sender is set. Currently _sender is only set |
| // by SelectiveChannel. |
| if (_sender) { |
| if (_sender->IssueRPC(start_realtime_us) != 0) { |
| return HandleSendFailed(); |
| } |
| CHECK_EQ(0, bthread_id_unlock(cid)); |
| return; |
| } |
| |
| // Pick a target server for sending RPC |
| _current_call.need_feedback = false; |
| _current_call.enable_circuit_breaker = has_enabled_circuit_breaker(); |
| SocketUniquePtr tmp_sock; |
| if (SingleServer()) { |
| // Don't use _current_call.peer_id which is set to -1 after construction |
| // of the backup call. |
| const int rc = Socket::Address(_single_server_id, &tmp_sock); |
| if (rc != 0 || (!is_health_check_call() && !tmp_sock->IsAvailable())) { |
| SetFailed(EHOSTDOWN, "Not connected to %s yet, server_id=%" PRIu64, |
| endpoint2str(_remote_side).c_str(), _single_server_id); |
| tmp_sock.reset(); // Release ref ASAP |
| return HandleSendFailed(); |
| } |
| _current_call.peer_id = _single_server_id; |
| } else { |
| LoadBalancer::SelectIn sel_in = |
| { start_realtime_us, true, |
| has_request_code(), _request_code, _accessed }; |
| LoadBalancer::SelectOut sel_out(&tmp_sock); |
| const int rc = _lb->SelectServer(sel_in, &sel_out); |
| if (rc != 0) { |
| std::ostringstream os; |
| DescribeOptions opt; |
| opt.verbose = false; |
| _lb->Describe(os, opt); |
| SetFailed(rc, "Fail to select server from %s", os.str().c_str()); |
| return HandleSendFailed(); |
| } |
| _current_call.need_feedback = sel_out.need_feedback; |
| _current_call.peer_id = tmp_sock->id(); |
| // NOTE: _remote_side must be set here because _pack_request below |
| // may need it (e.g. http may set "Host" to _remote_side) |
| // Don't set _local_side here because tmp_sock may be not connected |
| // here. |
| _remote_side = tmp_sock->remote_side(); |
| } |
| if (_stream_creator) { |
| _current_call.stream_user_data = |
| _stream_creator->OnCreatingStream(&tmp_sock, this); |
| if (FailedInline()) { |
| return HandleSendFailed(); |
| } |
| // remote_side can't be changed. |
| CHECK_EQ(_remote_side, tmp_sock->remote_side()); |
| } |
| |
| Span* span = _span; |
| if (span) { |
| if (_current_call.nretry == 0) { |
| span->set_remote_side(_remote_side); |
| } else { |
| span->Annotate("Retrying %s", |
| endpoint2str(_remote_side).c_str()); |
| } |
| } |
| // Handle connection type |
| if (_connection_type == CONNECTION_TYPE_SINGLE || |
| _stream_creator != NULL) { // let user decides the sending_sock |
| // in the callback(according to connection_type) directly |
| _current_call.sending_sock.reset(tmp_sock.release()); |
| // TODO(gejun): Setting preferred index of single-connected socket |
| // has two issues: |
| // 1. race conditions. If a set perferred_index is overwritten by |
| // another thread, the response back has to check protocols one |
| // by one. This is a performance issue, correctness is unaffected. |
| // 2. thrashing between different protocols. Also a performance issue. |
| _current_call.sending_sock->set_preferred_index(_preferred_index); |
| } else { |
| int rc = 0; |
| if (_connection_type == CONNECTION_TYPE_POOLED) { |
| rc = tmp_sock->GetPooledSocket(&_current_call.sending_sock); |
| } else if (_connection_type == CONNECTION_TYPE_SHORT) { |
| rc = tmp_sock->GetShortSocket(&_current_call.sending_sock); |
| } else { |
| tmp_sock.reset(); |
| SetFailed(EINVAL, "Invalid connection_type=%d", (int)_connection_type); |
| return HandleSendFailed(); |
| } |
| if (rc) { |
| tmp_sock.reset(); |
| SetFailed(rc, "Fail to get %s connection", |
| ConnectionTypeToString(_connection_type)); |
| return HandleSendFailed(); |
| } |
| // Remember the preferred protocol for non-single connection. When |
| // the response comes back, InputMessenger calls the right handler |
| // w/o trying other protocols. This is a must for (many) protocols that |
| // can't be distinguished from other protocols w/o ambiguity. |
| _current_call.sending_sock->set_preferred_index(_preferred_index); |
| // Set preferred_index of main_socket as well to make it easier to |
| // debug and observe from /connections. |
| if (tmp_sock->preferred_index() < 0) { |
| tmp_sock->set_preferred_index(_preferred_index); |
| } |
| tmp_sock.reset(); |
| } |
| if (_tos > 0) { |
| _current_call.sending_sock->set_type_of_service(_tos); |
| } |
| if (is_response_read_progressively()) { |
| // Tag the socket so that when the response comes back, the parser will |
| // stop before reading all body. |
| _current_call.sending_sock->read_will_be_progressive(_connection_type); |
| } |
| |
| // Handle authentication |
| const Authenticator* using_auth = NULL; |
| if (_auth != NULL) { |
| // Only one thread will be the winner and get the right to pack |
| // authentication information, others wait until the request |
| // is sent. |
| int auth_error = 0; |
| if (_current_call.sending_sock->FightAuthentication(&auth_error) == 0) { |
| using_auth = _auth; |
| } else if (auth_error != 0) { |
| SetFailed(auth_error, "Fail to authenticate, %s", |
| berror(auth_error)); |
| return HandleSendFailed(); |
| } |
| } |
| // Make request |
| butil::IOBuf packet; |
| SocketMessage* user_packet = NULL; |
| _pack_request(&packet, &user_packet, cid.value, _method, this, |
| _request_buf, using_auth); |
| // TODO: PackRequest may accept SocketMessagePtr<>? |
| SocketMessagePtr<> user_packet_guard(user_packet); |
| if (FailedInline()) { |
| // controller should already be SetFailed. |
| if (using_auth) { |
| // Don't forget to signal waiters on authentication |
| _current_call.sending_sock->SetAuthentication(ErrorCode()); |
| } |
| return HandleSendFailed(); |
| } |
| |
| timespec connect_abstime; |
| timespec* pabstime = NULL; |
| if (_connect_timeout_ms > 0) { |
| if (_deadline_us >= 0) { |
| connect_abstime = butil::microseconds_to_timespec( |
| std::min(_connect_timeout_ms * 1000L + start_realtime_us, |
| _deadline_us)); |
| } else { |
| connect_abstime = butil::microseconds_to_timespec( |
| _connect_timeout_ms * 1000L + start_realtime_us); |
| } |
| pabstime = &connect_abstime; |
| } |
| Socket::WriteOptions wopt; |
| wopt.id_wait = cid; |
| wopt.abstime = pabstime; |
| wopt.pipelined_count = _pipelined_count; |
| wopt.auth_flags = _auth_flags; |
| wopt.ignore_eovercrowded = has_flag(FLAGS_IGNORE_EOVERCROWDED); |
| wopt.write_in_background = write_to_socket_in_background(); |
| int rc; |
| size_t packet_size = 0; |
| if (user_packet_guard) { |
| if (span) { |
| packet_size = user_packet_guard->EstimatedByteSize(); |
| } |
| rc = _current_call.sending_sock->Write(user_packet_guard, &wopt); |
| } else { |
| packet_size = packet.size(); |
| rc = _current_call.sending_sock->Write(&packet, &wopt); |
| } |
| if (span) { |
| if (_current_call.nretry == 0) { |
| span->set_sent_us(butil::cpuwide_time_us()); |
| span->set_request_size(packet_size); |
| } else { |
| span->Annotate("Requested(%lld) [%d]", |
| (long long)packet_size, _current_call.nretry + 1); |
| } |
| } |
| if (using_auth) { |
| // For performance concern, we set authentication to immediately |
| // after the first `Write' returns instead of waiting for server |
| // to confirm the credential data |
| _current_call.sending_sock->SetAuthentication(rc); |
| } |
| CHECK_EQ(0, bthread_id_unlock(cid)); |
| } |
| |
| void Controller::set_auth_context(const AuthContext* ctx) { |
| if (_auth_context != NULL) { |
| LOG(FATAL) << "Impossible! This function is supposed to be called " |
| "only once when verification succeeds in server side"; |
| return; |
| } |
| // Ownership is belong to `Socket' instead of `Controller' |
| _auth_context = ctx; |
| } |
| |
| int Controller::HandleSocketFailed(bthread_id_t id, void* data, int error_code, |
| const std::string& error_text) { |
| Controller* cntl = static_cast<Controller*>(data); |
| if (!cntl->is_used_by_rpc()) { |
| // Cannot destroy the call_id before RPC otherwise an async RPC |
| // using the controller cannot be joined and related resources may be |
| // destroyed before done->Run() running in another bthread. |
| // The error set will be detected in Channel::CallMethod and fail |
| // the RPC. |
| cntl->SetFailed(error_code, "Cancel call_id=%" PRId64 |
| " before CallMethod()", id.value); |
| return bthread_id_unlock(id); |
| } |
| const int saved_error = cntl->ErrorCode(); |
| if (error_code == ERPCTIMEDOUT) { |
| cntl->SetFailed(error_code, "Reached timeout=%" PRId64 "ms @%s", |
| cntl->timeout_ms(), |
| butil::endpoint2str(cntl->remote_side()).c_str()); |
| } else if (error_code == EBACKUPREQUEST) { |
| cntl->SetFailed(error_code, "Reached backup timeout=%" PRId64 "ms @%s", |
| cntl->backup_request_ms(), |
| butil::endpoint2str(cntl->remote_side()).c_str()); |
| } else if (!error_text.empty()) { |
| cntl->SetFailed(error_code, "%s", error_text.c_str()); |
| } else { |
| cntl->SetFailed(error_code, "%s @%s", berror(error_code), |
| butil::endpoint2str(cntl->remote_side()).c_str()); |
| } |
| |
| struct OnVersionedRPCReturnedArgs { |
| bthread_id_t id; |
| Controller* cntl; |
| int error; |
| }; |
| auto func = [](void* p) -> void* { |
| std::unique_ptr<OnVersionedRPCReturnedArgs> args(static_cast<OnVersionedRPCReturnedArgs*>(p)); |
| CompletionInfo info = { args->id, false }; |
| args->cntl->OnVersionedRPCReturned(info, true, args->error); |
| return NULL; |
| }; |
| |
| auto* args = new OnVersionedRPCReturnedArgs{ id, cntl, saved_error }; |
| bthread_t tid; |
| // RetryPolicy may block current bthread, so start a new bthread to run OnVersionedRPCReturned |
| if (!cntl->_retry_policy || bthread_start_background(&tid, NULL, func, args) != 0) { |
| func(args); |
| } |
| return 0; |
| } |
| |
| CallId Controller::call_id() { |
| butil::atomic<uint64_t>* target = |
| (butil::atomic<uint64_t>*)&_correlation_id.value; |
| uint64_t loaded = target->load(butil::memory_order_relaxed); |
| if (loaded) { |
| const CallId id = { loaded }; |
| return id; |
| } |
| // Optimistic locking. |
| CallId cid = { 0 }; |
| // The range of this id will be reset in Channel::CallMethod |
| CHECK_EQ(0, bthread_id_create2(&cid, this, HandleSocketFailed)); |
| if (!target->compare_exchange_strong(loaded, cid.value, |
| butil::memory_order_relaxed)) { |
| bthread_id_cancel(cid); |
| cid.value = loaded; |
| } |
| return cid; |
| } |
| |
| void Controller::SaveClientSettings(ClientSettings* s) const { |
| s->timeout_ms = _timeout_ms; |
| s->backup_request_ms = _backup_request_ms; |
| s->max_retry = _max_retry; |
| s->tos = _tos; |
| s->connection_type = _connection_type; |
| s->request_compress_type = _request_compress_type; |
| s->log_id = log_id(); |
| s->has_request_code = has_request_code(); |
| s->request_code = _request_code; |
| } |
| |
| void Controller::ApplyClientSettings(const ClientSettings& s) { |
| set_timeout_ms(s.timeout_ms); |
| set_backup_request_ms(s.backup_request_ms); |
| set_max_retry(s.max_retry); |
| set_type_of_service(s.tos); |
| set_connection_type(s.connection_type); |
| set_request_compress_type(s.request_compress_type); |
| set_log_id(s.log_id); |
| set_flag(FLAGS_REQUEST_CODE, s.has_request_code); |
| _request_code = s.request_code; |
| } |
| |
| int Controller::sub_count() const { |
| int n = _pchan_sub_count; |
| if (_sender) { |
| n += !!GetSubControllerOfSelectiveChannel(_sender, 0); |
| } |
| return n; |
| } |
| |
| const Controller* Controller::sub(int index) const { |
| if (_pchan_sub_count > 0 && _done != NULL) { |
| return GetSubControllerOfParallelChannel(_done, index); |
| } |
| if (_sender != NULL) { |
| return GetSubControllerOfSelectiveChannel(_sender, index); |
| } |
| return NULL; |
| } |
| |
| uint64_t Controller::trace_id() const { return _span ? _span->trace_id() : 0; } |
| uint64_t Controller::span_id() const { return _span ? _span->span_id() : 0; } |
| |
| void* Controller::session_local_data() { |
| if (_session_local_data) { |
| return _session_local_data; |
| } |
| if (_server) { |
| SimpleDataPool* pool = _server->_session_local_data_pool; |
| if (pool) { |
| _session_local_data = pool->Borrow(); |
| return _session_local_data; |
| } |
| } |
| return NULL; |
| } |
| |
| void Controller::HandleStreamConnection(Socket *host_socket) { |
| if (_request_stream == INVALID_STREAM_ID) { |
| CHECK(!has_remote_stream()); |
| return; |
| } |
| SocketUniquePtr ptr; |
| if (!FailedInline()) { |
| if (Socket::Address(_request_stream, &ptr) != 0) { |
| if (!FailedInline()) { |
| SetFailed(EREQUEST, "Request stream=%" PRIu64 " was closed before responded", |
| _request_stream); |
| } |
| } else if (_remote_stream_settings == NULL) { |
| if (!FailedInline()) { |
| SetFailed(EREQUEST, "The server didn't accept the stream"); |
| } |
| } |
| } |
| if (FailedInline()) { |
| Stream::SetFailed(_request_stream); |
| if (_remote_stream_settings != NULL) { |
| policy::SendStreamRst(host_socket, |
| _remote_stream_settings->stream_id()); |
| } |
| return; |
| } |
| Stream* s = (Stream*)ptr->conn(); |
| s->SetConnected(_remote_stream_settings); |
| } |
| |
| // TODO: Need more security advices from professionals. |
| // TODO: Is percent encoding better? |
| void WebEscape(const std::string& source, std::string* output) { |
| output->reserve(source.length() + 10); |
| for (size_t pos = 0; pos != source.size(); ++pos) { |
| switch (source[pos]) { |
| case '&': output->append("&"); break; |
| case '\"': output->append("""); break; |
| case '\'': output->append("'"); break; |
| case '<': output->append("<"); break; |
| case '>': output->append(">"); break; |
| default: output->push_back(source[pos]); break; |
| } |
| } |
| } |
| |
| std::string WebEscape(const std::string& source) { |
| std::string output; |
| WebEscape(source, &output); |
| return output; |
| } |
| |
| void Controller::reset_sampled_request(SampledRequest* req) { |
| delete _sampled_request; |
| _sampled_request = req; |
| } |
| |
| void Controller::set_stream_creator(StreamCreator* sc) { |
| if (_stream_creator) { |
| LOG(FATAL) << "A StreamCreator has been set previously"; |
| return; |
| } |
| _stream_creator = sc; |
| } |
| |
| butil::intrusive_ptr<ProgressiveAttachment> |
| Controller::CreateProgressiveAttachment(StopStyle stop_style) { |
| if (has_progressive_writer()) { |
| LOG(ERROR) << "One controller can only have one ProgressiveAttachment"; |
| return NULL; |
| } |
| if (_request_protocol != PROTOCOL_HTTP) { |
| LOG(ERROR) << "Only http supports ProgressiveAttachment now"; |
| return NULL; |
| } |
| if (_current_call.sending_sock == NULL) { |
| LOG(ERROR) << "sending_sock is NULL"; |
| return NULL; |
| } |
| SocketUniquePtr httpsock; |
| _current_call.sending_sock->ReAddress(&httpsock); |
| |
| if (stop_style == FORCE_STOP) { |
| httpsock->fail_me_at_server_stop(); |
| } |
| _wpa.reset(new ProgressiveAttachment( |
| httpsock, http_request().before_http_1_1())); |
| return _wpa; |
| } |
| |
| void Controller::ReadProgressiveAttachmentBy(ProgressiveReader* r) { |
| if (r == NULL) { |
| LOG(FATAL) << "Param[r] is NULL"; |
| return; |
| } |
| if (!is_response_read_progressively()) { |
| return r->OnEndOfMessage( |
| butil::Status(EINVAL, "Can't read progressive attachment from a " |
| "controller without calling " |
| "response_will_be_read_progressively() before")); |
| } |
| if (_rpa == NULL) { |
| return r->OnEndOfMessage( |
| butil::Status(EINVAL, "ReadableProgressiveAttachment is NULL")); |
| } |
| if (has_progressive_reader()) { |
| return r->OnEndOfMessage( |
| butil::Status(EPERM, "%s can't be called more than once", |
| __FUNCTION__)); |
| } |
| add_flag(FLAGS_PROGRESSIVE_READER); |
| return _rpa->ReadProgressiveAttachmentBy(r); |
| } |
| |
| void Controller::set_mongo_session_data(MongoContext* data) { |
| _mongo_session_data = data; |
| } |
| |
| bool Controller::is_ssl() const { |
| Socket* s = _current_call.sending_sock.get(); |
| return s != NULL && s->is_ssl(); |
| } |
| |
| x509_st* Controller::get_peer_certificate() const { |
| Socket* s = _current_call.sending_sock.get(); |
| return s ? s->GetPeerCertificate() : NULL; |
| } |
| |
| int Controller::GetSockOption(int level, int optname, void* optval, socklen_t* optlen) { |
| Socket* s = _current_call.sending_sock.get(); |
| if (s) { |
| return getsockopt(s->fd(), level, optname, optval, optlen); |
| } else { |
| errno = EBADF; |
| return -1; |
| } |
| } |
| |
| void Controller::CallAfterRpcResp(const google::protobuf::Message* req, const google::protobuf::Message* res) { |
| if (_after_rpc_resp_fn) { |
| _after_rpc_resp_fn(this, req, res); |
| _after_rpc_resp_fn = nullptr; |
| } |
| } |
| |
| #if defined(OS_MACOSX) |
| typedef sig_t SignalHandler; |
| #else |
| typedef sighandler_t SignalHandler; |
| #endif |
| |
| static volatile bool s_signal_quit = false; |
| static SignalHandler s_prev_sigint_handler = NULL; |
| static SignalHandler s_prev_sigterm_handler = NULL; |
| static SignalHandler s_prev_sighup_handler = NULL; |
| |
| static void quit_handler(int signo) { |
| s_signal_quit = true; |
| if (SIGINT == signo && s_prev_sigint_handler) { |
| s_prev_sigint_handler(signo); |
| } |
| if (SIGTERM == signo && s_prev_sigterm_handler) { |
| s_prev_sigterm_handler(signo); |
| } |
| if (SIGHUP == signo && s_prev_sighup_handler) { |
| s_prev_sighup_handler(signo); |
| } |
| } |
| |
| static pthread_once_t register_quit_signal_once = PTHREAD_ONCE_INIT; |
| |
| static void RegisterQuitSignalOrDie() { |
| // Not thread-safe. |
| SignalHandler prev = signal(SIGINT, quit_handler); |
| if (prev != SIG_DFL && |
| prev != SIG_IGN) { // shell may install SIGINT of background jobs with SIG_IGN |
| RELEASE_ASSERT_VERBOSE(prev != SIG_ERR, |
| "Fail to register SIGINT, abort"); |
| s_prev_sigint_handler = prev; |
| LOG(WARNING) << "SIGINT was installed with " << prev; |
| } |
| |
| if (FLAGS_graceful_quit_on_sigterm) { |
| prev = signal(SIGTERM, quit_handler); |
| if (prev != SIG_DFL && |
| prev != SIG_IGN) { // shell may install SIGTERM of background jobs with SIG_IGN |
| RELEASE_ASSERT_VERBOSE(prev != SIG_ERR, |
| "Fail to register SIGTERM, abort"); |
| s_prev_sigterm_handler = prev; |
| LOG(WARNING) << "SIGTERM was installed with " << prev; |
| } |
| } |
| |
| if (FLAGS_graceful_quit_on_sighup) { |
| prev = signal(SIGHUP, quit_handler); |
| if (prev != SIG_DFL && |
| prev != SIG_IGN) { // shell may install SIGHUP of background jobs with SIG_IGN |
| RELEASE_ASSERT_VERBOSE(prev != SIG_ERR, |
| "Fail to register SIGHUP, abort"); |
| s_prev_sighup_handler = prev; |
| LOG(WARNING) << "SIGHUP was installed with " << prev; |
| } |
| } |
| } |
| |
| bool IsAskedToQuit() { |
| pthread_once(®ister_quit_signal_once, RegisterQuitSignalOrDie); |
| return s_signal_quit; |
| } |
| |
| void AskToQuit() { |
| raise(SIGINT); |
| } |
| |
| class DoNothingClosure : public google::protobuf::Closure { |
| void Run() { } |
| }; |
| google::protobuf::Closure* DoNothing() { |
| return butil::get_leaky_singleton<DoNothingClosure>(); |
| } |
| |
| KVMap& Controller::SessionKV() { |
| if (_session_kv == nullptr) { |
| _session_kv.reset(new KVMap); |
| } |
| return *_session_kv.get(); |
| } |
| |
| #define BRPC_SESSION_END_MSG "Session ends." |
| #define BRPC_REQ_ID "@rid" |
| #define BRPC_KV_SEP "=" |
| |
| void Controller::FlushSessionKV(std::ostream& os) { |
| if (_session_kv == nullptr || _session_kv->Count() == 0) { |
| return; |
| } |
| |
| const std::string* pRID = nullptr; |
| if (!request_id().empty()) { |
| pRID = &request_id(); |
| } |
| |
| if (FLAGS_log_as_json) { |
| if (pRID) { |
| os << "\"" BRPC_REQ_ID "\":\"" << *pRID << "\","; |
| } |
| os << "\"M\":\"" BRPC_SESSION_END_MSG "\""; |
| for (auto it = _session_kv->Begin(); it != _session_kv->End(); ++it) { |
| os << ",\"" << it->first << "\":\"" << it->second << '"'; |
| } |
| } else { |
| if (pRID) { |
| os << BRPC_REQ_ID BRPC_KV_SEP << *pRID << " "; |
| } |
| os << BRPC_SESSION_END_MSG; |
| for (auto it = _session_kv->Begin(); it != _session_kv->End(); ++it) { |
| os << ' ' << it->first << BRPC_KV_SEP << it->second; |
| } |
| } |
| } |
| |
| std::ostream& operator<<(std::ostream& os, const Controller::LogPrefixDummy& p) { |
| p.DoPrintLogPrefix(os); |
| return os; |
| } |
| |
| void Controller::DoPrintLogPrefix(std::ostream& os) const { |
| const std::string* pRID = nullptr; |
| if (!request_id().empty()) { |
| pRID = &request_id(); |
| if (pRID) { |
| if (FLAGS_log_as_json) { |
| os << BRPC_REQ_ID "\":\"" << *pRID << "\","; |
| } else { |
| os << BRPC_REQ_ID BRPC_KV_SEP << *pRID << " "; |
| } |
| } |
| } |
| if (FLAGS_log_as_json) { |
| os << "\"M\":\""; |
| } |
| } |
| |
| } // namespace brpc |