blob: 9a575f1f6aa38180620dd7d20c6b3f8eef9b0477 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "pack_queue.h"
#include <cstdlib>
#include <functional>
#include "proxylist_config.h"
#include "sdk_constant.h"
#include "sdk_core.h"
#include "logger.h"
#include "msg_protocol.h"
#include "send_buffer.h"
#include "socket_connection.h"
#include "tc_api.h"
#include "utils.h"
#include "msg_protocol.h"
namespace dataproxy_sdk
{
PackQueue::PackQueue(const std::string &inlong_group_id, const std::string &inlong_stream_id)
: cur_len_(0), inlong_group_id_(inlong_group_id), inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0), msg_type_(g_config.msg_type_), data_capacity_(g_config.buf_size_)
{
data_ = new char[data_capacity_];
memset(data_, 0x0, data_capacity_);
topic_desc_ = "groupId=" + inlong_group_id_ + "&streamId=" + inlong_stream_id_;
first_use_ = Utils::getCurrentMsTime();
last_use_ = Utils::getCurrentMsTime();
data_time_ = 0;
}
PackQueue::~PackQueue()
{
if (data_)
{
delete[] data_;
data_ = nullptr;
}
}
int32_t PackQueue::sendMsg(const std::string &msg,
const std::string &inlong_group_id,
const std::string &inlong_stream_id,
const std::string &client_ip,
uint64_t report_time,
UserCallBack call_back)
{
std::lock_guard<std::mutex> lck(mutex_);
//pack previous
if (isTriggerPack(report_time, msg.size()))
{
int32_t res = writeToBuf();
if (res)
{
increasePackErr();
return res;
}
}
//write data to packqueue
int32_t append_ret = appendMsg(msg, client_ip, report_time, call_back);
if (append_ret)
{
LOG_ERROR("fail to write to pack queue, inlong_group_id: %s, inlong_stream_id: %s", inlong_group_id.c_str(), inlong_stream_id.c_str());
return append_ret;
}
//if uneable_pack, single msg is written to packqueue and directly sent to buf
if (!g_config.enable_pack_)
{
int32_t res = writeToBuf();
if (res)
{
increasePackErr();
return res;
}
}
return 0;
}
//send pack data to buf
int32_t PackQueue::writeToBuf()
{
if (inlong_group_id_.empty())
{
LOG_ERROR("something is wrong, check!!");
return SDKInvalidResult::kFailGetConn;
}
if (msg_set_.empty())
{
LOG_ERROR("no msg in msg_set, check!");
return SDKInvalidResult::kFailGetPackQueue;
}
auto conn = g_clusters->getSendConn(inlong_group_id_);
if (!conn)
{
LOG_ERROR("no avaliable connection for inlong_group_id: %s, try later", inlong_group_id_.c_str());
return SDKInvalidResult::kFailGetConn;
}
auto pool = g_pools->getPool(inlong_group_id_);
if (!pool)
{
return SDKInvalidResult::kFailGetBufferPool;
}
SendBuffer *send_buf = nullptr;
int32_t res = pool->getSendBuf(send_buf);
if (res)
{
return res;
}
if (!send_buf)
{
LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
return SDKInvalidResult::kFailGetSendBuf;
}
//lock sendbuf and write pack data to sendbuf
{
std::lock_guard<std::mutex> buf_lck(send_buf->mutex_);
uint32_t len = 0;
int32_t msg_cnt = msg_set_.size();
// std::string msg_groupid = inlong_group_id_;
uint32_t uniq_id = g_send_msgid.incrementAndGet();
if (!packOperate(send_buf->content(), len, uniq_id) || len == 0)
{
LOG_ERROR("failed to write data to send buf from pack queue, pool id:%d, buf id:%d", pool->poolId(), pool->writeId());
return SDKInvalidResult::kFailWriteToBuf;
}
send_buf->setLen(len);
send_buf->setMsgCnt(msg_cnt);
send_buf->setGroupid(inlong_group_id_);
send_buf->setStreamid(inlong_stream_id_);
send_buf->setUniqId(uniq_id);
send_buf->setTarget(conn);
send_buf->setIsPacked(true);
for (auto it : msg_set_)
{
send_buf->addUserMsg(it);
}
}
pack_num_.increment();
g_pools->addUid2BufPool(send_buf->uniqId(),pool);//used for ack finding
resetPackQueue();
pool->sendBufToConn(send_buf);
return 0;
}
int32_t PackQueue::appendMsg(const std::string &msg, std::string client_ip, int64_t report_time, UserCallBack call_back)
{
//too long msg
if (msg.size() > g_config.ext_pack_size_)
{
LOG_ERROR("msg len (%d) more than ext_pack_size (%d)", msg.size(), g_config.ext_pack_size_);
return SDKInvalidResult::kMsgTooLong;
}
//if datatime is illegal, fix it using current time
if (Utils::isLegalTime(report_time))
data_time_ = report_time;
else
{
data_time_ = Utils::getCurrentMsTime();
pack_redotime_cnt_.increment();
}
//used for callback
if (call_back)
{
std::string user_client_ip = client_ip;
int64_t user_report_time = report_time;
if (client_ip.empty())
{
client_ip = "127.0.0.1";
}
std::string data_pack_format_attr = "__addcol1__reptime=" + Utils::getFormatTime(data_time_) + "&__addcol2__ip=" + client_ip;
msg_set_.emplace_back(std::make_shared<UserMsg>(msg, client_ip, data_time_, call_back, data_pack_format_attr, user_client_ip, user_report_time));
}
cur_len_ += msg.size() + 1; // '\n' using one byte
if (g_config.isNormalDataPackFormat())
{
cur_len_ += 4;
}
if (g_config.isAttrDataPackFormat())
{
cur_len_ += constants::kAttrLen + 8;
}
//update last using time
last_use_ = Utils::getCurrentMsTime();
return 0;
}
/**
* @description: whether trigger pack data
* @param {uint64_t} report_time
* @param {int32_t} msg_len
*/
bool PackQueue::isTriggerPack(uint64_t report_time, int32_t msg_len)
{
if (0 == cur_len_ || msg_set_.empty())
return false;
if (!Utils::isLegalTime(report_time))
{
report_time = Utils::getCurrentMsTime();
}
int64_t max_pack_time_interval = 1800000; // FIXME: use user config?
bool time_trigger = false; //timeout trigger
bool len_trigger = false; //content trigger
if (llabs(report_time - data_time_) > max_pack_time_interval ||
(report_time != data_time_ && report_time / 1000 / 3600 != data_time_ / 1000 / 3600))
{
time_trigger = true;
}
if (msg_len + cur_len_ > g_config.pack_size_)
{
len_trigger = true;
}
return (time_trigger || len_trigger);
}
/**
* @description: do pack operate
* @param {int8*} pack_data: packed binary data
* @param {uint32_t&} out_len
* @return {*} true if pack successfully
*/
bool PackQueue::packOperate(char *pack_data, uint32_t &out_len, uint32_t uniq_id)
{
if (!pack_data)
{
LOG_ERROR("nullptr, failed to allocate memory for buf");
return false;
}
//add body into data_, then zip and copy to buffer
uint32_t idx = 0;
for (auto &it : msg_set_)
{
//msg>=5,body format: data_len|data
if (msg_type_ >= 5) //add data_len
{
*(uint32_t *)(&data_[idx]) = htonl(it->msg.size());
idx += sizeof(uint32_t);
}
//add data
memcpy(&data_[idx], it->msg.data(), it->msg.size());
idx += static_cast<uint32_t>(it->msg.size());
//add attrlen|attr
if (g_config.isAttrDataPackFormat())
{
*(uint32_t *)(&data_[idx]) = htonl(it->data_pack_format_attr.size());
idx += sizeof(uint32_t);
memcpy(&data_[idx], it->data_pack_format_attr.data(), it->data_pack_format_attr.size());
idx += static_cast<uint32_t>(it->data_pack_format_attr.size());
}
// msgtype = 2/3 support '\n'
if (msg_type_ == 2 || msg_type_ == 3)
{
data_[idx] = '\n';
++idx;
}
}
//preprocess attr
uint32_t cnt = 1;
if (msg_set_.size())
{
cnt = msg_set_.size();
}
//pack
if (msg_type_ >= constants::kBinPackMethod)
{
char *bodyBegin = pack_data + sizeof(BinaryMsgHead) + sizeof(uint32_t); //head+body_len
uint32_t body_len = 0;
std::string snappy_res;
bool isSnappy = isZipAndOperate(snappy_res, idx);
char real_msg_type;
if (isSnappy) // need zip
{
body_len = static_cast<uint32_t>(snappy_res.size());
memcpy(bodyBegin, snappy_res.data(), body_len); //copy data to buf
// msg_type
real_msg_type = (msg_type_ | constants::kBinSnappyFlag);
}
else
{
body_len = idx;
memcpy(bodyBegin, data_, body_len);
real_msg_type = msg_type_;
}
*(uint32_t *)(&(pack_data[sizeof(BinaryMsgHead)])) = htonl(body_len); //set bodylen
bodyBegin += body_len;
// groupid_num、streamid_num、ext_field、data_time、cnt、uniq
uint32_t char_groupid_flag = 0;
std::string groupid_streamid_char;
uint16_t groupid_num = 0, streamid_num = 0;
if (g_config.enableCharGroupid() || groupId_num_ == 0 || streamId_num_ == 0) //using string groupid and streamid
{
groupid_num = 0;
streamid_num = 0;
groupid_streamid_char = topic_desc_;
char_groupid_flag = 0x4;
}
else
{
groupid_num = groupId_num_;
streamid_num = streamId_num_;
}
uint16_t ext_field = (g_config.extend_field_ | char_groupid_flag);
uint32_t data_time = data_time_ / 1000;
// attr
std::string attr;
if (g_config.enableTraceIP())
{
if (groupid_streamid_char.empty())
attr = "node1ip=" + g_config.ser_ip_ + "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
else
attr = groupid_streamid_char + "&node1ip=" + g_config.ser_ip_ + "&rtime1=" + std::to_string(Utils::getCurrentMsTime());
}
else
{
attr = topic_desc_;
}
// attrlen
*(uint16_t *)bodyBegin = htons(attr.size());
bodyBegin += sizeof(uint16_t);
// attr
memcpy(bodyBegin, attr.data(), attr.size());
bodyBegin += attr.size();
// magic
*(uint16_t *)bodyBegin = htons(constants::kBinaryMagic);
uint32_t total_len = 25 + body_len + attr.size();
// header
char *p = pack_data;
*(uint32_t *)p = htonl(total_len);
p += 4;
*p = real_msg_type;
++p;
*(uint16_t *)p = htons(groupid_num);
p += 2;
*(uint16_t *)p = htons(streamid_num);
p += 2;
*(uint16_t *)p = htons(ext_field);
p += 2;
*(uint32_t *)p = htonl(data_time);
p += 4;
*(uint16_t *)p = htons(cnt);
p += 2;
*(uint32_t *)p = htonl(uniq_id);
out_len = total_len + 4;
}
else
{
if (msg_type_ == 3 || msg_type_ == 2)
{
--idx;
}
// body whether needs zip
char *bodyBegin = pack_data + sizeof(ProtocolMsgHead) + sizeof(uint32_t);
uint32_t body_len = 0;
std::string snappy_res;
bool isSnappy = isZipAndOperate(snappy_res, idx);
if (isSnappy)
{
body_len = static_cast<uint32_t>(snappy_res.size());
memcpy(bodyBegin, snappy_res.data(), body_len); //copy
}
else
{
body_len = idx;
memcpy(bodyBegin, data_, body_len);
}
*(uint32_t *)(&(pack_data[sizeof(ProtocolMsgHead)])) = htonl(body_len); //set bodylen
bodyBegin += body_len;
// attr
std::string attr;
attr = topic_desc_;
attr += "&dt=" + std::to_string(data_time_);
attr += "&mid=" + std::to_string(uniq_id);
if (isSnappy)
attr += "&cp=snappy";
attr += "&cnt=" + std::to_string(cnt);
attr += "&sid=" + std::string(Utils::getSnowflakeId());
if (g_config.is_from_DC_)
{ //&__addcol1_reptime=yyyymmddHHMMSS&__addcol2__ip=BBB&f=dc
attr += "&__addcol1_reptime=" + Utils::getFormatTime(Utils::getCurrentMsTime()) + "&__addcol2__ip=" + g_config.ser_ip_ + "&f=dc";
}
// attrlen
*(uint32_t *)bodyBegin = htonl(attr.size());
bodyBegin += sizeof(uint32_t);
// attr
memcpy(bodyBegin, attr.data(), attr.size());
// total_len
uint32_t total_len = 1 + 4 + body_len + 4 + attr.size();
*(uint32_t *)pack_data = htonl(total_len);
// msg_type
*(&pack_data[4]) = msg_type_;
LOG_TRACE("after packoperate: total_len:%d, body_len:%d, attr_len:%d", total_len, body_len, attr.size());
out_len = total_len + 4;
}
return true;
}
/**
* @description: whether body data should be zipped; if needs, zip data and save as res
* @param {string&} res
* @param {uint32_t} real_cur_len: data len before zip
* @return {*} true if needing zip
*/
bool PackQueue::isZipAndOperate(std::string &res, uint32_t real_cur_len)
{
if (g_config.enable_zip_ && real_cur_len > g_config.min_zip_len_)
{
LOG_TRACE("start snappy.");
Utils::zipData(data_, real_cur_len, res);
return true;
}
else
return false;
}
void PackQueue::checkQueue(bool isLastPack)
{
if (cur_len_ == 0 || msg_set_.empty())
return;
//no timeout, and it isn't last packing
if (Utils::getCurrentMsTime() - first_use_ < g_config.pack_timeout_ && !isLastPack)// FIXME:should use first_use instead of last_use?
return;
LOG_TRACE("start auto pack, inlong_group_id:%s, inlong_stream_id:%s", inlong_group_id_.c_str(), inlong_stream_id_.c_str());
std::lock_guard<std::mutex> lck(mutex_);
last_use_ = Utils::getCurrentMsTime();
//send fail, callback and reset
if (writeToBuf())
{
for (auto& it : msg_set_)
{
if (it->cb) { it->cb(inlong_group_id_.data(), inlong_stream_id_.data(), it->msg.data(), it->msg.size(), it->user_report_time, it->user_client_ip.data()); }
}
resetPackQueue();
}
}
GlobalQueues::~GlobalQueues()
{
closeCheckSubroutine();
if (worker_.joinable())
{
worker_.join();
}
for (auto it : queues_)
{
it.second->checkQueue(true);
}
queues_.clear();
}
void GlobalQueues::startCheckSubroutine() { worker_ = std::thread(&GlobalQueues::checkPackQueueSubroutine, this); }
void GlobalQueues::checkPackQueueSubroutine()
{
LOG_INFO("start checkPackQueue subroutine");
while (!exit_flag_)
{
Utils::taskWaitTime(2); // FIXME:improve pack interval
for (auto it : queues_)
{
it.second->checkQueue(exit_flag_);
}
}
LOG_WARN("exit checkPackQueue subroutine");
}
//get pack queue, create if not exists
PackQueuePtr GlobalQueues::getPackQueue(const std::string &inlong_group_id, const std::string &inlong_stream_id)
{
std::lock_guard<std::mutex> lck(mutex_);
auto it = queues_.find(inlong_group_id + inlong_stream_id);
if (it != queues_.end())
return it->second;
else
{
PackQueuePtr p = std::make_shared<PackQueue>(inlong_group_id, inlong_stream_id);
queues_.emplace(inlong_group_id + inlong_stream_id, p);
return p;
}
}
void GlobalQueues::printAck()
{
if (queues_.empty())
return;
for (auto &it : queues_)
{
LOG_STAT("dataproxy_sdk_cpp #local:%s#%s#success send msg:%d", g_config.ser_ip_.c_str(), it.second->topicDesc().c_str(),
it.second->success_num_.getAndSet(0));
}
}
void GlobalQueues::printTotalAck()
{
if (queues_.empty())
return;
for (auto &it : queues_)
{
LOG_STAT("dataproxy_sdk_cpp #local:%s#%s#total success msg:%d", g_config.ser_ip_.c_str(), it.second->topicDesc().c_str(),
it.second->total_success_num_.get());
}
}
void GlobalQueues::showState()
{
if (1 == user_exit_flag.get())
return;
uint32_t total_pack = 0;
for (auto &it : queues_)
{
uint32_t pack = it.second->pack_num_.getAndSet(0);
total_pack += pack;
LOG_DEBUG("toipc:%s, pack_num:%d", it.second->topicDesc().c_str(), pack);
}
LOG_DEBUG("total_pack:%d", total_pack);
g_pools->showState();
g_executors->showState();
}
} // namespace dataproxy_sdk