blob: 994e5c0c73ff80478089d1d25392f4ff7a260b54 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "client_connection.h"
using namespace tubemq;
const uint32_t ClientConnection::kConnnectMaxTimeMs;
void ClientConnection::AsyncWrite(RequestContextPtr& req) {
auto self = shared_from_this();
executor_->Post([self, this, req]() {
if (request_list_.find(req->request_id_) != request_list_.end()) {
LOG_ERROR("Write requestid[%d] is repeat", req->request_id_);
return;
}
auto& transport_req = request_list_[req->request_id_];
transport_req.req_ = req;
bool queue_empty = write_queue_.empty();
write_queue_.push_back(req->request_id_);
if (req->timeout_ > 0) {
transport_req.deadline_ = executor_->CreateSteadyTimer();
transport_req.deadline_->expires_after(std::chrono::milliseconds(req->timeout_));
transport_req.deadline_->async_wait(std::bind(&ClientConnection::requestTimeoutHandle,
shared_from_this(), std::placeholders::_1,
transport_req.req_));
}
if (IsConnected() && queue_empty) {
asyncWrite();
}
});
}
void ClientConnection::Close() {
auto self = shared_from_this();
executor_->Post([self, this]() { close(); });
}
void ClientConnection::requestTimeoutHandle(const std::error_code& ec, RequestContextPtr req) {
if (ec) {
return;
}
auto request_id = req->request_id_;
auto err = ErrorCode(err_code::kErrNetWorkTimeout, "Request is timeout");
requestCallback(request_id, &err);
}
void ClientConnection::close(const std::error_code* err) {
if (IsStop()) {
return;
}
status_ = kDisconnected;
LOG_INFO("%scloseed", ToString().c_str());
socket_->close();
if (notifier_ != nullptr) {
notifier_(err);
}
releaseAllRequest(err);
}
void ClientConnection::releaseAllRequest(const std::error_code* err) {
std::string msg = "connect close ";
if (err != nullptr) {
msg += "error_code, value:";
msg += std::to_string(err->value());
msg += ", msg:";
msg += err->message();
msg += ", category:";
msg += err->category().name();
}
auto terr = ErrorCode(err_code::kErrNetworkError, msg);
for (auto& it : request_list_) {
it.second.req_->promise_.SetFailed(terr);
it.second.deadline_->cancel();
}
request_list_.clear();
write_queue_.clear();
recv_buffer_->Reset();
}
void ClientConnection::connect(const asio::ip::tcp::resolver::results_type& endpoints) {
if (IsStop()) {
return;
}
status_ = kConnecting;
deadline_->expires_after(std::chrono::milliseconds(kConnnectMaxTimeMs));
deadline_->async_wait(std::bind(&ClientConnection::checkDeadline, this, std::placeholders::_1));
asio::async_connect(
*socket_, endpoints, [this](std::error_code ec, asio::ip::tcp::endpoint endpoint) {
deadline_->cancel();
if (ec) {
status_ = kDisconnected;
LOG_ERROR("%s[%s:%d]async connect error:%d, %s, %s", ToString().c_str(), ip_.c_str(),
port_, ec.value(), ec.message().c_str(), ec.category().name());
close(&ec);
return;
}
status_ = kConnected;
socket_->set_option(asio::ip::tcp::no_delay(true));
// socket_->set_option(asio::ip::tcp::socket::reuse_address(true));
contextString();
LOG_INFO("%sis connected", ToString().c_str());
asyncWrite();
asyncRead();
});
}
void ClientConnection::checkDeadline(const std::error_code& ec) {
if (ec) {
return;
}
if (IsStop()) {
return;
}
LOG_ERROR("%s connect timeout", ToString().c_str());
close();
}
void ClientConnection::contextString() {
std::stringstream stream;
stream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
context_string_ += stream.str();
}
void ClientConnection::asyncRead() {
if (IsStop()) {
return;
}
if (recv_buffer_->capacity() > rpc_config::kRpcRecvBufferMaxBytes) {
LOG_ERROR("%sbuffer capacity over config:%d", ToString().c_str(),
rpc_config::kRpcRecvBufferMaxBytes);
close();
return;
}
recv_buffer_->EnsureWritableBytes(rpc_config::kRpcEnsureWriteableBytes);
auto self = shared_from_this();
socket_->async_receive(
asio::buffer(recv_buffer_->WriteBegin(), recv_buffer_->WritableBytes()),
[self, this](std::error_code ec, std::size_t len) {
if (ec) {
LOG_ERROR("[%s]async read error:%d, %s, %s", ToString().c_str(), ec.value(),
ec.message().c_str(), ec.category().name());
close(&ec);
return;
}
if (len == 0) {
LOG_ERROR("[%s]async read zero", ToString().c_str());
close(&ec);
return;
}
recv_time_ = std::time(nullptr);
recv_buffer_->WriteBytes(len);
std::error_code error;
size_t availsize = socket_->available(error);
if (availsize > 0 && !error) {
recv_buffer_->EnsureWritableBytes(availsize);
size_t rlen = socket_->receive(asio::buffer(recv_buffer_->WriteBegin(), availsize));
if (rlen > 0) {
recv_buffer_->WriteBytes(rlen);
}
}
while (checkPackageDone() > 0 && recv_buffer_->length() > 0) {
}
asyncRead();
});
}
int ClientConnection::checkPackageDone() {
if (check_ == nullptr) {
recv_buffer_->Reset();
LOG_ERROR("check codec func not set");
return -1;
}
if (package_length_ > recv_buffer_->length()) {
return 0;
}
uint32_t request_id = 0;
bool has_request_id = false;
Any check_out;
auto buff = recv_buffer_->Slice();
size_t package_length = 0;
auto result = check_(buff, check_out, request_id, has_request_id, package_length);
if (result < 0) {
package_length_ = 0;
LOG_ERROR("%s, check codec package result:%d", ToString().c_str(), result);
close();
return -1;
}
if (result == 0) {
package_length_ = package_length;
return 0;
}
++read_package_number_;
package_length_ = 0;
recv_buffer_->Skip(result);
if (!has_request_id) {
auto it = request_list_.begin();
if (it == request_list_.end()) {
LOG_ERROR("%s, not find request", ToString().c_str());
return 1;
}
requestCallback(it->first, nullptr, &check_out);
return 1;
}
requestCallback(request_id, nullptr, &check_out);
return 1;
}
void ClientConnection::requestCallback(uint32_t request_id, ErrorCode* err, Any* check_out) {
auto it = request_list_.find(request_id);
if (it == request_list_.end()) {
LOG_INFO("%srequest[%d] not find from request_list_", ToString().c_str(), request_id);
return;
}
auto req = &it->second;
req->deadline_->cancel();
if (err != nullptr) {
LOG_ERROR("%srequest[%d] error:%d, msg:%s", ToString().c_str(), request_id, err->Value(),
err->Message().c_str());
req->req_->promise_.SetFailed(*err);
request_list_.erase(it);
return;
}
if (check_out != nullptr) {
ResponseContext rsp;
BufferPtr* buff = any_cast<BufferPtr>(check_out);
if (buff != nullptr) {
req->req_->codec_->Decode(*buff, request_id, rsp.rsp_);
} else {
rsp.rsp_ = *check_out;
}
req->req_->promise_.SetValue(rsp);
} else {
req->req_->promise_.SetFailed(ErrorCode(err_code::kErrNetworkError, "response is null"));
}
request_list_.erase(it);
}
TransportRequest* ClientConnection::nextTransportRequest() {
uint32_t request_id;
TransportRequest* transport_req = nullptr;
while (!write_queue_.empty()) {
request_id = write_queue_.front();
write_queue_.pop_front();
auto it = request_list_.find(request_id);
if (it == request_list_.end()) {
continue;
}
transport_req = &it->second;
break;
}
return transport_req;
}
void ClientConnection::asyncWrite() {
if (IsStop()) {
return;
}
auto transport_req = nextTransportRequest();
if (transport_req == nullptr) {
return;
}
auto self = shared_from_this();
auto& req = transport_req->req_;
asio::async_write(
*socket_,
asio::buffer(transport_req->req_->buf_->data(), transport_req->req_->buf_->length()),
[self, this, req](std::error_code ec, std::size_t length) {
if (ec) {
close(&ec);
LOG_ERROR("[%s]async write error:%d, message:%s, category:%s", ToString().c_str(),
ec.value(), ec.message().c_str(), ec.category().name());
return;
}
++write_package_number_;
LOG_TRACE("[%s]async write done, request_id:%d", ToString().c_str(), req->request_id_);
asyncWrite();
});
}