blob: 714fb7091cad237535fd6acf5a3f3faac4da329f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <asio.hpp>
#include <string>
#include <system_error>
#include "socket_connection.h"
using namespace std;
using namespace dataproxy_sdk;
class BigSizeConn : public enable_shared_from_this<BigSizeConn>
{
private:
ExecutorThreadPtr executor_;
string ip_;
uint16_t port_;
TcpSocketPtr socket_;
bool isConnect_;
char* data_;
uint32_t len_;
uint32_t runtimes_;
uint32_t count_;
RecvBufferPtr recv_buf_;
public:
BigSizeConn(ExecutorThreadPtr& executor, string ip, uint16_t port)
: executor_(executor)
, ip_(ip)
, port_(port)
, socket_(std::move(executor_->createTcpSocket()))
, isConnect_(false)
, data_(nullptr)
, len_(0)
, runtimes_(10)
, count_(0)
{
recv_buf_ = std::make_shared<RecvBuffer>();
asio::ip::tcp::endpoint ep(asio::ip::address::from_string(ip_), port_);
doConnect(ep);
}
void doRead()
{
if (!isConnect_) return;
recv_buf_->EnsureWritableBytes(1024);
auto self = shared_from_this();
socket_->async_receive(asio::buffer(recv_buf_->WriteBegin(), recv_buf_->WritableBytes()),
[self, this](const std::error_code& ec, std::size_t len) {
if (ec)
{
LOG_ERROR("async read data error:%s", ec.message().c_str());
doClose(&ec);
return;
}
if (len == 0)
{
LOG_ERROR("async read 0 bytes");
doClose(&ec);
return;
}
recv_buf_->WriteBytes(len);
//读取剩余未读完的数据,如果有的话
std::error_code tmp_error;
size_t left_size = socket_->available(tmp_error);
if (left_size > 0 && !tmp_error)
{
recv_buf_->EnsureWritableBytes(left_size);
size_t rlen = socket_->receive(asio::buffer(recv_buf_->WriteBegin(), left_size));
if (rlen > 0) { recv_buf_->WriteBytes(rlen); }
}
LOG_DEBUG("conn read %d bytes", recv_buf_->length());
doRead();
});
}
void doConnect(const asio::ip::tcp::endpoint& ep)
{
socket_->async_connect(ep, [this](const std::error_code& ec) {
if (ec)
{
isConnect_ = false;
LOG_ERROR("async connetc error");
doClose(&ec);
return;
}
isConnect_ = true;
doRead();
});
}
void doClose(const std::error_code* err)
{
if (!isConnect_) return;
LOG_WARN("close connnection");
socket_->close();
}
void doWrite()
{
if (!isConnect_ || runtimes_ == 0) return;
auto self = shared_from_this();
count_++;
LOG_DEBUG("send count:%d", count_);
asio::async_write(*socket_, asio::buffer(data_, len_), [self, this](const std::error_code& ec, std::size_t writelen) {
LOG_WARN("doWrite callback, writelen:%d", writelen);
if (writelen > 0 && writelen < len_)
{
while (writelen < len_)
{
LOG_WARN("cache data(len:%d) need to be resend", len_ - writelen);
writelen += asio::write(*socket_, asio::buffer(data_ + writelen, len_ - writelen));
}
}
if (!ec)
{
if (writelen != len_) { LOG_ERROR("async write error!!!!!!!!!!!!!!!!!!!!!"); }
--runtimes_;
doWrite();
}
else
{
LOG_ERROR("write error:%s", ec.message().c_str());
doClose(&ec);
}
});
}
void writeData(char* data, uint32_t len, uint32_t runtimes)
{
auto self = shared_from_this();
data_ = data;
len_ = len;
runtimes_ = runtimes;
executor_->postTask([self, this]() {
LOG_WARN("-----start send------");
if (isConnect_) { doWrite(); }
});
}
};
using BigSizeConnPtr = shared_ptr<BigSizeConn>;
const uint32_t capacity = 1024 * 1204 * 11;
char data[capacity] = {0};
int main(int argc, char* argv[])
{
// g_config = new ClientConfig("config.json");
int runtimes = 10;
if (argc == 2) { runtimes = atoi(argv[1]); }
ExecutorThreadPtr th1 = make_shared<ExecutorThread>(1);
BigSizeConnPtr conn = make_shared<BigSizeConn>(th1, "127.0.0.1", 46803);
string msg(1024 * 1024 * 10, 'a');
msg += "end";
string attr =
"inlong_group_id=test_group_id&inlong_stream_id=bigpacktest&dt=1638241486778&mid=10&cnt=1&sid=0x098714199eefe7f714569014e7000000";
uint32_t body_len = msg.size();
uint32_t attr_len = attr.size();
uint32_t total_len = 9 + body_len + attr_len;
char msg_type = 3;
char* p = data;
*(uint32_t*)p = htonl(total_len);
p += 4;
*p = msg_type;
++p;
*(uint32_t*)p = htonl(body_len);
p += 4;
memcpy(p, msg.data(), body_len);
p += body_len;
*(uint32_t*)p = htonl(attr_len);
p += 4;
memcpy(p, attr.data(), attr_len);
uint32_t sendtotal = total_len + 4;
this_thread::sleep_for(chrono::seconds(2));
conn->writeData(data, sendtotal, runtimes);
this_thread::sleep_for(chrono::minutes(2));
return 0;
}