blob: 55ed55f45223fa3dcef9b1de61926b6ad9df7e2e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "buffer_pool.h"
#include <chrono>
#include <memory>
#include <vector>
#include "proxylist_config.h"
#include "sdk_core.h"
#include "logger.h"
#include "send_buffer.h"
#include "socket_connection.h"
#include "tc_api.h"
namespace dataproxy_sdk
{
BufferPool::BufferPool(uint32_t pool_id, uint32_t buf_num, uint32_t buf_size)
: pool_id_(pool_id), buf_num_(buf_num), read_(0), write_(0), current_use_(0), pool_mutex_(), executor_(std::make_shared<ExecutorThread>(pool_id))
{
buffers_.reserve(buf_num_);
for (int i = 0; i < buf_num_; i++)
{
buffers_.push_back(new SendBuffer(buf_size));
}
}
BufferPool::~BufferPool()
{
for (auto it : buffers_)
{
delete it;
}
buffers_.clear();
LOG_DEBUG("free send buffer memory, pool(id:%d)", pool_id_);
}
bool BufferPool::isAvaliable()
{
return current_use_ < buf_num_;
}
int32_t BufferPool::getSendBuf(SendBuffer *&send_buf)
{
std::lock_guard<std::mutex> lck(pool_mutex_);
if (current_use_ >= buf_num_)
{
LOG_ERROR("buffer pool(id:%d) is full", pool_id_);
return SDKInvalidResult::kBufferPoolFull;
}
while (buffers_[write_]->isUsed())
{
write_ = (write_ + 1) % buf_num_;
}
send_buf = buffers_[write_];
// ++write_;
if (!send_buf)
{
LOG_ERROR("failed to get send buf, pool(id:%d)", pool_id_);
return SDKInvalidResult::kFailGetSendBuf;
}
send_buf->setIsUsed(true);
++current_use_;
return 0;
}
int32_t BufferPool::sendBufToConn(SendBuffer *&send_buf)
{
std::lock_guard<std::mutex> pool_lck(pool_mutex_);
std::lock_guard<std::mutex> buf_lck(send_buf->mutex_);
auto self = shared_from_this();
if (g_config.retry_num_ > 0 && g_config.retry_interval_ > 0) //resend if need
{
executor_->postTask([self, this, send_buf]
{
send_buf->timeout_timer_ = executor_->createSteadyTimer();
send_buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config.retry_interval_));
send_buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, shared_from_this(), std::placeholders::_1, send_buf)); });
}
if (send_buf->target()->isStop())
{
auto new_conn = g_clusters->createActiveConn(send_buf->inlong_group_id(), pool_id_);
if (!new_conn)
{
LOG_WARN("first send buf, fail to create new conn for sendbuf, inlong_group_id:%s, inlong_stream_id:%s", send_buf->inlong_group_id().c_str(), send_buf->inlong_stream_id().c_str());
send_buf->fail_create_conn_.increment();
// send_buf->increaseRetryNum();
// send_err_.increment();
already_send_buf_list_[send_buf->uniqId()] = send_buf; //preparing for next sending
return -1;
}
send_buf->setTarget(new_conn);
}
send_buf->target()->sendBuf(send_buf); //do send operation
send_buf->increaseRetryNum();
LOG_TRACE("request buf(id:%d) send to connection%s first time, inlong_group_id:%s, inlong_stream_id:%s", send_buf->uniqId(), send_buf->target()->getRemoteInfo().c_str(),
send_buf->inlong_group_id().c_str(), send_buf->inlong_stream_id().c_str());
already_send_buf_list_[send_buf->uniqId()] = send_buf; //add buf uid into sent list
send_total_.increment();
//update metrics
has_send_buf_.increment();
waiting_ack_.increment();
LOG_TRACE("success to write buf pool, pool(id:%d), buf(uid:%d), inlong_group_id:%s, inlong_stream_id:%s", pool_id_, send_buf->uniqId(), send_buf->inlong_group_id().c_str(),
send_buf->inlong_stream_id().c_str());
return 0;
}
void BufferPool::RetryHandler(const std::error_code &ec, SendBuffer *buf)
{
if (ec) // timer is cancelled, two cases: 1.ackbuf->sendbuf.reset;2.msg_type=2,conn.doWrite->cancel
{
if (g_config.msg_type_ == 2)
{
LOG_TRACE("msg_type is 2, no need ackmsg, clear buf(uid:%d) directly", buf->uniqId());
ackBuf(buf->uniqId());
}
return;
}
if (!buf->isUsed())
{
return;
} // buf is already acked before retry
std::lock_guard<std::mutex> buf_lck(buf->mutex_);
if (buf->getAlreadySend() == 2)
{
LOG_INFO("buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s) ackmsg timeout, send %d times(max retry_num:%d)", buf->uniqId(), buf->inlong_group_id().c_str(),
buf->inlong_stream_id().c_str(), buf->getAlreadySend(), g_config.retry_num_);
}
else
{
LOG_DEBUG("buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s) ackmsg timeout, send %d times(max retry_num:%d)", buf->uniqId(), buf->inlong_group_id().c_str(),
buf->inlong_stream_id().c_str(), buf->getAlreadySend(), g_config.retry_num_);
}
// max_retry_num, usercallback
if (buf->getAlreadySend() >= g_config.retry_num_ || buf->fail_create_conn_.get() >= constants::kMaxRetryConnection)
{
LOG_WARN("fail to send buf(id:%d, inlong_group_id:%s, inlong_stream_id:%s), has send max_retry_num(%d) times, start usercallback", buf->uniqId(), buf->inlong_group_id().c_str(),
buf->inlong_stream_id().c_str(), g_config.retry_num_);
buf->doUserCallBack();
buf->reset();
}
else // ack timeout, resend
{
if (!buf->target() || buf->target()->isStop())
{
auto new_conn = g_clusters->createActiveConn(buf->inlong_group_id(), pool_id_); // TODO: should improve as choosing from active conn instread of creating?
if (!new_conn) //create conn error, waiting for next creating
{
LOG_INFO("fail to create new conn to send buf, inlong_group_id:%s, inlong_stream_id:%s", buf->inlong_group_id().c_str(), buf->inlong_stream_id().c_str());
buf->fail_create_conn_.increment();
buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config.retry_interval_));
buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, shared_from_this(), std::placeholders::_1, buf));
return;
}
buf->setTarget(new_conn);
}
buf->target()->sendBuf(buf);
buf->increaseRetryNum();
buf->timeout_timer_->expires_after(std::chrono::milliseconds(g_config.retry_interval_));
buf->timeout_timer_->async_wait(std::bind(&BufferPool::RetryHandler, shared_from_this(), std::placeholders::_1, buf));
}
}
void BufferPool::ackBufHelper(uint32_t uniq_id)
{
std::lock_guard<std::mutex> lck(pool_mutex_);
if (already_send_buf_list_.find(uniq_id) == already_send_buf_list_.end())
{
LOG_ERROR("no buf(uid:%d) in already_send_buf_list", uniq_id);
return;
}
auto &buf2ack = already_send_buf_list_[uniq_id];
std::lock_guard<std::mutex> buf_lck(buf2ack->mutex_);
int32_t msg_cnt = buf2ack->msgCnt();
success_ack_.add(msg_cnt);
//update packqueue metrics
if(g_queues){
auto packqueue = g_queues->getPackQueue(buf2ack->inlong_group_id(), buf2ack->inlong_stream_id());
packqueue->success_num_.add(msg_cnt);
packqueue->total_success_num_.add(msg_cnt);
}
already_send_buf_list_[uniq_id]->reset();
already_send_buf_list_.erase(uniq_id);
--current_use_;
waiting_ack_.decrement();
has_ack_.increment();
LOG_TRACE("pool(id:%d) success ack msg cumulative num: %d", pool_id_, success_ack_.get());
}
void BufferPool::showState(const std::string &inlong_group_id)
{
if (inlong_group_id.empty())
{
LOG_STAT("STATE|pool_id:%d, current_use:%d(total:%d), invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", pool_id_, currentUse(),
g_config.bufNum(), has_send_buf_.get(), has_ack_.get(), waiting_ack_.get());
}
else
{
LOG_STAT("STATE|inlong_group_id:%s, pool_id:%d, current_use:%d(total:%d), invoke_send_buf:%d, has_ack:%d, waiting_ack:%d", inlong_group_id.c_str(), pool_id_, currentUse(),
g_config.bufNum(), has_send_buf_.get(), has_ack_.get(), waiting_ack_.get());
}
}
void BufferPool::close()
{
if (executor_ != nullptr)
{
executor_->close();
}
executor_.reset();
}
TotalPools::TotalPools() : next_(0), mutex_()
{
if (g_config.enable_groupId_isolation_) // different groupid data use different bufpool
{
for (int32_t i = 0; i < g_config.inlong_group_ids_.size(); i++) // create a bufpool for ervery groupidS
{
std::vector<BufferPoolPtr> groupid_pool;
groupid_pool.reserve(g_config.buffer_num_per_groupId_);
for (int32_t j = 0; j < g_config.buffer_num_per_groupId_; j++)
{
groupid_pool.push_back(std::make_shared<BufferPool>(j, g_config.bufNum(), g_config.buf_size_));
}
groupid2pool_map_[g_config.inlong_group_ids_[i]] = groupid_pool;
groupid2next_[g_config.inlong_group_ids_[i]] = 0;
}
}
else // round-robin
{
pools_.reserve(g_config.shared_buf_nums_);
for (int i = 0; i < g_config.shared_buf_nums_; i++)
{
pools_.push_back(std::make_shared<BufferPool>(i, g_config.bufNum(), g_config.buf_size_));
}
}
}
BufferPoolPtr TotalPools::getPool(const std::string &inlong_group_id)
{
if (g_config.enable_groupId_isolation_) // groupid isolate
{
auto groupid_pool = groupid2pool_map_.find(inlong_group_id);
if (groupid_pool == groupid2pool_map_.end() || groupid_pool->second.empty())
{
LOG_ERROR("fail to get bufferpool, inlong_group_id:%s", inlong_group_id.c_str());
return nullptr;
}
if (groupid2next_.find(inlong_group_id)==groupid2next_.end())
{
groupid2next_[inlong_group_id]=0;
}
auto& pool_set=groupid_pool->second;
int32_t idx=0;
for (int32_t i = 0; i < pool_set.size(); i++)
{
idx = (groupid2next_[inlong_group_id]++) % pool_set.size();
if (pool_set[idx]->isAvaliable())
{
return pool_set[idx];
}
}
return nullptr;
}
else
{
if (pools_.empty())
{
LOG_ERROR("fail to get bufferpool, allocate error in tc_init");
return nullptr;
}
int32_t idx=0;
for (int32_t i = 0; i < pools_.size(); i++)
{
idx = (next_++) % pools_.size();
if (pools_[idx]->isAvaliable())
{
return pools_[idx];
}
}
return nullptr;
}
}
bool TotalPools::isPoolAvailable(const std::string &inlong_group_id)
{
if (g_config.enable_groupId_isolation_) // groupid_isolation
{
auto groupid_pool = groupid2pool_map_.find(inlong_group_id);
if (groupid_pool == groupid2pool_map_.end())
{
LOG_ERROR("no buffer allocated to inlong_group_id:%s, check config", inlong_group_id.c_str());
return false;
}
for (int i = 0; i < groupid_pool->second.size(); i++)
{
if (groupid_pool->second[i]->isAvaliable())
{
return true;
}
}
return false;
}
else // rr
{
for (int i = 0; i < pools_.size(); i++)
{
if (pools_[i]->isAvaliable())
{
return true;
}
}
return false;
}
}
void TotalPools::addUid2BufPool(uint32_t uniqId, BufferPoolPtr& bpr){
std::lock_guard<std::mutex> lck(mutex_);
uid2buf_pool_[uniqId]=bpr;
}
BufferPoolPtr TotalPools::getUidBufPool(uint32_t uniqId){
std::lock_guard<std::mutex> lck(mutex_);
auto iter=uid2buf_pool_.find(uniqId);
if(iter==uid2buf_pool_.end()){
return nullptr;
}
uid2buf_pool_.erase(iter); //TODO: need add it into bufpool after ack operation
return iter->second;
}
void TotalPools::showState()
{
if (g_config.enable_groupId_isolation_)
{
for (auto &groupid_pool : groupid2pool_map_)
{
showStateHelper(groupid_pool.first, groupid_pool.second);
}
}
else
{
showStateHelper("", pools_);
}
}
void TotalPools::showStateHelper(const std::string &inlong_group_id, std::vector<BufferPoolPtr> &pools)
{
for (auto &pool : pools)
{
pool->showState(inlong_group_id);
}
}
void TotalPools::close()
{
// waiting for sending the rest data
if (uid2buf_pool_.size())
{
LOG_INFO("waiting for 10s to ack remaining msg");
std::this_thread::sleep_for(std::chrono::seconds(10));
}
for (auto &pool : pools_)
{
pool->close();
}
for (auto &groupid_pool : groupid2pool_map_)
{
for (auto &pool : groupid_pool.second)
{
pool->close();
}
}
}
} // namespace dataproxy_sdk