blob: c133f2aadca7beb1e439b258fe2f8c091d2aebe6 [file] [log] [blame]
// influxdb2test.cpp: 定义应用程序的入口点。
//
#include <unistd.h>
#include <iostream>
#include <sstream>
#include <string>
#include <memory>
#include "iotdbtest.h"
#include "cpr/cpr.h"
#include "nlohmann/json.hpp"
#include "iotdbclient.h"
#include "ThreadPool.h"
#include <fmt/format.h>
#include <mutex>
#include <chrono>
#include <fmt/chrono.h>
// for convenience
//using json = nlohmann::json;
using namespace std;
using namespace chrono;
shared_ptr<ThreadPool> gthreadpools;
shared_ptr<ThreadPool> gthreadpoolm;
string g_address = "localhost:8086";
int g_start_no_mul = 1; //多点写,起始点号
int g_num_onetime_mul = 5000; //多点写,每包数据的点数
int g_write_times_mul = 3600 * 24 * 365; //多点写,写次数
int g_time_offset_mul = 0; //多点写,往前推的时间偏移(S),即从当前时间往前推多少时间的时标开始写
int g_sleep_time_mul = 1000; //单点写,每次写的sleep时间
int g_start_no_single = 1; //单点写,起始点号
int g_num_onetime_single = 1; //单点写,每包数据的点(tag)数
int g_write_time_single = 3600; //单点写,每次每个点(tag)写的时间数(s)
int g_time_offset_single = 31536000; //单点写,往前推的时间偏移(S),即从当前时间往前推多少时间的时标开始写
int g_sleep_time_single = 1000; //单点写,每次写的sleep时间
int g_create_tag_start_no = 1; //起始点号
int g_create_tag_onetime_num = 10000; // 每次创建多少点
int g_create_tag_num = 50000; //要创建的总点数
std::atomic<int> g_min_write_time(50000); //最小写时间
std::atomic<int> g_max_write_time(0); //最大写时间
time_t test_start_time = 0; //测试开始时间
//CRITICAL_SECTION g_criticalSection;
mutex g_mutex;
void workerthreads()
{
//写数据
//string strUrlw = "http://localhost:8086/write";
IotdbClient client;
string strUrlw = "http://";
strUrlw += g_address;
strUrlw += "/api/v2/write";
int iRet = -1;
string strResponse;
int iStartNo = 0;
int iEndNo = 0;
int write_times = 0;
std::unique_lock<std::mutex> lk(g_mutex);
iStartNo = g_start_no_single;
iEndNo = iStartNo + g_num_onetime_single;
g_start_no_single = iEndNo;
write_times = (g_time_offset_single - 1) / g_write_time_single + 1;
lk.unlock();
//int iBufLen = 1024*500;
time_t tt_now = time(NULL);
fmt::memory_buffer data_buffer;
data_buffer.resize(1024*5);
int64_t time1 = 0;
int write_count = 0;
double dValue = 0;
int i_q = 1;
srand(time(nullptr));
for (int j = 0; j < write_times; j++)
{
auto time_1 = steady_clock::now();
data_buffer.clear();
for (int i = iStartNo; i < iEndNo; i++)
{
for (int k = 0; k < g_write_time_single; k++)
{
i_q = k % 4;
fmt::format_to(data_buffer, "v,TagName=Tag_{}", i);
dValue = (double)rand() / 101;
fmt::format_to(data_buffer, " value={},q={} ", dValue, i_q);
time1 = (tt_now - g_time_offset_single + g_write_time_single* j + k) * 1000000000; //时间转换为ns
fmt::format_to(data_buffer, "{}\n", time1);
}
}
auto time_2 = steady_clock::now();
string data = fmt::to_string(data_buffer);
bool bRet = client.WriteData(strUrlw, "org_test", "bucket_test", data);
write_count++;
auto time_3 = steady_clock::now();
auto duration_write = duration_cast<milliseconds>(time_3 - time_2);
auto duration_make_pkt = duration_cast<milliseconds>(time_2 - time_1);
cout << "iStartNo=" << iStartNo << ",count=" << write_count << ",write time=" << duration_write.count() << ",make pkg time=" << duration_make_pkt.count() << ",iRet=" << iRet << ",ilen=" << data.size() << ",Response=" << strResponse << endl;
if (g_sleep_time_single > 0)//sleep时间大于0
{
auto use_time = duration_cast<milliseconds>(time_3 - time_1);
auto sleep_time = std::chrono::milliseconds(g_sleep_time_single) - use_time;
if (sleep_time.count() > 0)
{
this_thread::sleep_for(sleep_time);
}
}
}
}
void workerthreadaggregation()
{
//写数据
//string strUrlw = "http://localhost:8086/query?";
string strUrlw = "http://172.21.31.51:8086/query?";
IotdbClient client;
int iRet = -1;
string strResponse;
int iTagNo = 0;
int iTagNum = 0;
std::unique_lock<std::mutex> lk(g_mutex);
iTagNum = g_num_onetime_single;
iTagNo = g_start_no_single;
g_start_no_single += g_num_onetime_single;
iTagNum = g_num_onetime_single;
lk.unlock();
//int iBufLen = 1024*500;
time_t tt = time(NULL);
fmt::memory_buffer data_buffer;
data_buffer.resize(1024 * 5);
int iLen = 0;
int iCount = 0;
long lValue = 0;
for (int k = 0; k < iTagNum; k++)
{
data_buffer.clear();
/*
wPkt << "SELECT max(value),min(value),mean(value) INTO mydb.longyears.TagValue20m FROM TagValue where TagName='Tag_";
wPkt << iTagNo;
wPkt << "'";
wPkt << " GROUP BY time(20m),TagName";
wPkt.GetUsedBuf(&szData, iLen);
cout << "aggregation begain" << endl;
pClient->aggregation(strUrlw, "mydb", szData, strResponse, 300000);
cout << "aggregation end " << strResponse << endl;
iTagNo++;
*/
}
}
/**
* @brief 多点单时刻写
*/
void workerthreadm()
{
//写数据
//string strUrlw = "http://172.21.31.51:8086/write";
//string strUrlw = "http://localhost:8086/write";
string strUrlw = "http://";
strUrlw += g_address;
strUrlw += "/api/v2/write";
//string strUrlw = "http://localhost:8086/write";
//string strUrlw = "http://localhost:8428/write";
IotdbClient client;
client.SessionOpen("127.0.0.1", 6667, "root", "root");
int iRet = -1;
string strResponse;
int iStartNo = 0;
int iEndNo = 0;
std::unique_lock<std::mutex> lk(g_mutex);
iStartNo = g_start_no_mul;
iEndNo = iStartNo + g_num_onetime_mul;
g_start_no_mul = iEndNo;
lk.unlock();
time_t tt = time(NULL);
fmt::memory_buffer data_buffer;
data_buffer.resize(1024 * 5);
int iLen = 0;
long interval = 31536 * 1000;
long interval1 = interval;
int64_t time1 = 0;
int iCount = 0;
long lValue = 0;
double dValue = 1.1;
double tag1_value = 0;
int i_q = 1;
string deviceId;
vector<string> measurements;
measurements.push_back("v");
measurements.push_back("q");
time_t start_tt = time(NULL);
//有前推时间,造较早时间的数据
if (g_time_offset_mul > 0)
{
tt = time(NULL);
start_tt = tt;
tt -= g_time_offset_mul;
}
srand(time(nullptr));
for (int j = 0; j < g_write_times_mul; j++)
{
vector<string> deviceIds;
vector<vector<string>> measurementsList;
vector<std::vector<TSDataType::TSDataType>> typesList;
vector<vector<string>> valuesList;
vector<std::vector<char*>> sz_valuesList;
vector<int64_t> timestamps;
vector<string> values_temp;
string value;
data_buffer.clear();
i_q = j % 4;
time_t now_tt = time(NULL);
if (0 == g_time_offset_mul)
{
tt = time(NULL);
}
else
{
tt++;
}
auto time_1 = steady_clock::now();
//fmt::format_to(data_buffer, "INSERT INTO v VALUES");
for (int i = iStartNo; i < iEndNo; i++)
{
dValue = (double)rand() / 101;
string deviceId = fmt::format("root.test2.tag_{}", i);
deviceIds.push_back(deviceId);
measurementsList.push_back(measurements);
//vector<string> values;
vector<char*> sz_values;
value = fmt::format("{}", dValue);
values_temp.push_back(value);
//sz_values.push_back((char*)value.c_str());
sz_values.push_back((char*)&dValue);
value = fmt::format("{}", i_q);
values_temp.push_back(value);
//sz_values.push_back((char*)value.c_str());
sz_values.push_back((char*)&i_q);
sz_valuesList.push_back(sz_values);
vector<TSDataType::TSDataType> dataTypes;
dataTypes.push_back(TSDataType::DOUBLE);
dataTypes.push_back(TSDataType::INT32);
typesList.push_back(dataTypes);
//time1 = tt * 1000000000;
time1 = tt * 1000;
timestamps.push_back(time1);
if (i == iStartNo)
{
tag1_value = dValue;
}
}
iCount++;
if (iCount > 1000)
{
iCount = 0;
}
auto time_2 = steady_clock::now();
interval1 = interval - j;
/*
for (int i = 0; i < 5000; i++)
{
ssDatas << "Tag_0000009";
ssDatas << ",";
ssDatas<<"TagName=Tag_0000009";
ssDatas<<" value=";
ssDatas<<GetTickCount();
ssDatas<<" ";
ssDatas<<(tt-interval)*1000000000;
ssDatas<<endl;
interval--;
}
*/
//strDatasw = ssDatas.str();
//ssDatas.str("");
//ssDatas.clear();
auto time_3 = steady_clock::now();
string data = fmt::to_string(data_buffer);
//bool bRet = client.WriteDataTcp(data);
client.insertAlignedRecords(deviceIds, timestamps, measurementsList, typesList, sz_valuesList);
//bool bRet = client.Exec("http://localhost:9000/exec",data);
auto time_4 = steady_clock::now();
auto duration_write = duration_cast<milliseconds>(time_4 - time_3);
auto duration_make_pkt = duration_cast<milliseconds>(time_2 - time_1);
/*
cout << "iStartNo=" << iStartNo << ",count=" << j << ",write time=" << duration_write.count()
<< ",make pkg time=" << duration_make_pkt.count() << ",iRet=" << iRet << ",ilen=" << iLen << ",Response=" << strResponse << endl;
*/
if (duration_write.count() < g_min_write_time)
{
g_min_write_time = duration_write.count();
}
if (duration_write.count() > g_max_write_time)
{
g_max_write_time = duration_write.count();
}
string str_time = fmt::format("{:%Y-%m-%d %H:%M:%S}", fmt::localtime(tt));
//string str_start_time = fmt::format("{:%Y-%m-%d %H:%M:%S}", fmt::localtime(start_tt));
string str_now_time = fmt::format("{:%Y-%m-%d %H:%M:%S}", fmt::localtime(now_tt));
//fmt::print("The date is {:%Y-%m-%d}.", fmt::localtime(t));
cout << "iStartNo=" << iStartNo << ",time=" << str_time << ",starttime=" << start_tt << ",now=" << str_now_time
<< ",tag1=" << tag1_value << ",count=" << j << ",write time=" << duration_write.count() << ",make pkg time="
<< duration_make_pkt.count() << ",iRet=" << iRet << ",ilen=" << iLen << ",Response=" << strResponse
<<",min write time="<< g_min_write_time<<",max write time=" << g_max_write_time<<endl;
if (g_sleep_time_mul > 0)//sleep时间大于0
{
auto use_time = duration_cast<milliseconds>(time_4 - time_1);
auto sleep_time = std::chrono::milliseconds(g_sleep_time_mul) - use_time;
if (sleep_time.count() > 0)
{
this_thread::sleep_for(sleep_time);
}
}
//this_thread::sleep_for(std::chrono::milliseconds(1000)- duration_write);
//Sleep(abs((int)(1000 - (dwTick2 - dwTick3))));
}
time_t test_end_time = time(NULL);
string str_start_time = fmt::format("{:%Y-%m-%d %H:%M:%S}", fmt::localtime(test_start_time));
string str_end_time = fmt::format("{:%Y-%m-%d %H:%M:%S}", fmt::localtime(test_end_time));
cout << "test end,start time:" << str_start_time << ",end time:" << str_end_time<< endl;
}
void createMultiTimeseries(IotdbClient& client)
{
/*
vector<string> paths;
vector<TSDataType::TSDataType> tsDataTypes;
vector<TSEncoding::TSEncoding> tsEncodings;
vector<CompressionType::CompressionType> compressionTypes;
*/
/*
int g_create_tag_start_no = 1; //起始点号
int g_create_tag_onetime_num = 10000; // 每次创建多少点
int g_create_tag_num = 50000; //要创建的总点数
*/
int j_count = g_create_tag_num / g_create_tag_onetime_num;
if (g_create_tag_num % g_create_tag_onetime_num == 0)
{
j_count = g_create_tag_num / g_create_tag_onetime_num;
}
else
{
j_count = g_create_tag_num / g_create_tag_onetime_num + 1;
}
int tag_count = 0; //要创建的总点数计数
int tag_no = g_create_tag_start_no; //点号
for (int j = 0; j < j_count; j++)
{
/*
paths.clear();
tsDataTypes.clear();
tsEncodings.clear();
compressionTypes.clear();
*/
vector<string> paths;
vector<TSDataType::TSDataType> tsDataTypes;
vector<TSEncoding::TSEncoding> tsEncodings;
vector<CompressionType::CompressionType> compressionTypes;
string path;
for (int i = 1; i <= g_create_tag_onetime_num && tag_count < g_create_tag_num; i++)
{
path = fmt::format("root.test2.tag_{}.v", tag_no);
paths.push_back(path);
tsDataTypes.push_back(TSDataType::DOUBLE);
tsEncodings.push_back(TSEncoding::GORILLA);
compressionTypes.push_back(CompressionType::LZ4);
path = fmt::format("root.test2.tag_{}.q", tag_no);
paths.push_back(path);
tsDataTypes.push_back(TSDataType::INT64);
tsEncodings.push_back(TSEncoding::RLE);
compressionTypes.push_back(CompressionType::LZ4);
tag_no++;
tag_count++;
}
client.createMultiTimeseries(paths, tsDataTypes, tsEncodings, compressionTypes);
}
std::cout << "createMultiTimeseries end"<<endl;
}
void insertRecords(IotdbClient& client)
{
string deviceId;
vector<string> measurements;
measurements.push_back("v");
measurements.push_back("q");
for (int k = 0; k < 1000; k++)
{
time_t now_tt = time(NULL);
for (int j = 0; j < 5; j++)
{
vector<string> deviceIds;
vector<vector<string>> measurementsList;
vector<vector<string>> valuesList;
vector<int64_t> timestamps;
for (int i = 1; i <= 10000; i++)
{
string deviceId = fmt::format("root.test2.tag_{}", j * 10000 + i);
deviceIds.push_back(deviceId);
measurementsList.push_back(measurements);
vector<string> values;
values.push_back("100");
values.push_back("2");
valuesList.push_back(values);
int64_t time1 = now_tt * 1000000000;
timestamps.push_back(time1);
}
client.insertRecords(deviceIds, timestamps, measurementsList, valuesList);
}
cout << "insertRecords count:" << k<<endl;
sleep(1000);
}
cout << "insertRecords end" << endl;
}
void QueryTest(std::string& sql)
{
if (sql.empty())
{
return;
}
IotdbClient client;
auto time_1 = steady_clock::now();
client.SessionOpen("127.0.0.1", 6667, "root", "root");
//client.SetStorageGroup("root.test2");
bool query = client.Query(sql);
client.SessionClose();
auto time_2 = steady_clock::now();
auto query_write = duration_cast<milliseconds>(time_2 - time_1).count();
cout << "QueryTest:sql=" << sql <<",res:"<< query<<",use time(ms):"<< query_write<< endl;
}
int main(int argc, char* argv[])
{
IotdbClient client;
string url = "http://localhost:9000/exec";
/*
string query;
query = "CREATE TABLE IF NOT EXISTS tagvalue1(name STRING, value double,ts timestamp) TIMESTAMP(ts) PARTITION BY MONTH";
client.Exec(url, query);
*/
/*
//cout << "Hello CMake." << endl;
url = "http://";
url += g_address;
url += "/api/v2/setup";
UserInfor user_infor;
user_infor.user_name = "admin";
user_infor.password = "12345678";
user_infor.org = "org_test";
user_infor.bucket = "bucket_test";
user_infor.token = "token_string";
//client.InitUser_Org_Bucket(url,user_infor);
user_infor.user_name = "admin";
user_infor.password = "12345678";
user_infor.org = "org_test2";
user_infor.bucket = "bucket_test2";
user_infor.token = "token_string2";
url = "http://";
url += g_address;
url += "/api/v2/setup/user";
//client.AddUser_Org_Bucket(url,user_infor);
//std::unique_lock<std::mutex> lk(g_mutex);
*/
int ithreadnum0 = 1;
int imode = 1;
int ithreadnum1 = 5;
if (argc > 1)
{
g_address = argv[1];
if (argc > 2)
{
imode = atoi(argv[2]);
}
if (0 == imode)//写单标签模式
{
if (argc > 3)
{
ithreadnum0 = atoi(argv[3]);
}
if (argc > 4)
{
g_start_no_single = atoi(argv[4]);
}
if (argc > 5)
{
g_num_onetime_single = atoi(argv[5]);
}
if (argc > 6)
{
g_write_time_single = atoi(argv[6]);
}
if (argc > 7)
{
g_time_offset_single = atoi(argv[7]);
}
if (argc > 8)
{
g_sleep_time_single = atoi(argv[8]);
}
}
else if (1 == imode)//批量标签模式
{
if (argc > 3)
{
ithreadnum1 = atoi(argv[3]);
}
if (argc > 4)
{
g_write_times_mul = atoi(argv[4]);
}
if (argc > 5)
{
g_start_no_mul = atoi(argv[5]);
}
if (argc > 6)
{
g_num_onetime_mul = atoi(argv[6]);
}
if (argc > 7)
{
g_time_offset_mul = atoi(argv[7]);
}
if (argc > 8)
{
g_sleep_time_mul = atoi(argv[8]);
}
}
else if (2 == imode)//写单标签模式和批量标签模式
{
if (argc > 10)
{
ithreadnum0 = atoi(argv[3]);
g_start_no_single = atoi(argv[4]);
g_num_onetime_single = atoi(argv[5]);
g_write_time_single = atoi(argv[6]);
g_time_offset_single = atoi(argv[7]);
g_sleep_time_single = atoi(argv[8]);
ithreadnum1 = atoi(argv[9]);
g_write_times_mul = atoi(argv[10]);
g_start_no_mul = atoi(argv[11]);
g_num_onetime_mul = atoi(argv[12]);
g_time_offset_mul = atoi(argv[13]);
g_sleep_time_mul = atoi(argv[14]);
}
}
else if (3 == imode) //聚合
{
if (argc > 3)
{
ithreadnum0 = atoi(argv[3]);
}
if (argc > 4)
{
g_start_no_single = atoi(argv[5]);
}
if (argc > 6)
{
g_num_onetime_single = atoi(argv[7]);
}
}
else if (3 == imode) //聚合
{
if (argc > 2)
{
ithreadnum0 = atoi(argv[2]);
}
if (argc > 3)
{
g_start_no_single = atoi(argv[3]);
}
if (argc > 4)
{
g_num_onetime_single = atoi(argv[4]);
}
}
else if (4 == imode)
{
if (argc > 3)
{
g_create_tag_num = atoi(argv[3]);
}
if (argc > 4)
{
g_create_tag_start_no = atoi(argv[4]);
}
if (argc > 5)
{
g_create_tag_onetime_num = atoi(argv[5 ]);
}
}
}
cout <<"param num: " << argc<<endl;
//<< " " << *argv << endl;
cout<< "mode= " << imode << endl
<< "g_address= " << g_address << endl
<< endl
<< "threadnum0= " << ithreadnum0 << endl
<< "g_start_no_single= " << g_start_no_single << endl
<< "g_num_onetime_single=" << g_num_onetime_single << endl
<< "g_write_time_single= " << g_write_time_single << endl
<< "g_time_offset_single=" << g_time_offset_single << endl
<< "g_sleep_time_single= " << g_sleep_time_single << endl
<< endl
<< "threadnum1= " << ithreadnum1 << endl
<< "g_write_times_mul= " << g_write_times_mul << endl
<< "g_start_no_mul= " << g_start_no_mul << endl
<< "g_num_onetime_mul= "<< g_num_onetime_mul << endl
<< "g_time_offset_mul= " << g_time_offset_mul << endl
<< "g_sleep_time_mul= "<< g_sleep_time_mul<< endl
<< "g_create_tag_num= " << g_create_tag_num << endl
<< "g_create_tag_start_no= " << g_create_tag_start_no << endl
<< "g_create_tag_onetime_num= " << g_create_tag_onetime_num << endl;
char c1;
cin >> c1;
//return 0;
//写单标签模式
if (0 == imode)
{
gthreadpools = make_shared<ThreadPool>(ithreadnum0);
int par = 1;
for (int i = 0; i < ithreadnum0; i++)
{
gthreadpools->enqueue(std::bind(&workerthreads));
}
}
else if (1 == imode) //批量标签模式
{
test_start_time = time(NULL);
gthreadpoolm = make_shared<ThreadPool>(ithreadnum1);
int par = 1;
for (int i = 0; i < ithreadnum1; i++)
{
gthreadpoolm->enqueue(std::bind(&workerthreadm));
}
}
else if (2 == imode)//写单标签模式和批量标签模式
{
gthreadpools = make_shared<ThreadPool>(ithreadnum0);
int par = 1;
for (int i = 0; i < ithreadnum0; i++)
{
gthreadpools->enqueue(std::bind(&workerthreads));
}
gthreadpoolm = make_shared<ThreadPool>(ithreadnum1);
for (int i = 0; i < ithreadnum1; i++)
{
gthreadpoolm->enqueue(std::bind(&workerthreadm));
}
}
else if (3 == imode)//聚合
{
gthreadpools = make_shared<ThreadPool>(ithreadnum0);
int par = 1;
for (int i = 0; i < ithreadnum0; i++)
{
gthreadpools->enqueue(std::bind(&workerthreadaggregation));
}
}
else if (4 == imode) //创建时间序列
{
client.SessionOpen("127.0.0.1", 6667, "root", "root");
client.SetStorageGroup("root.test2");
createMultiTimeseries(client);
//insertRecords(client);
client.SessionClose();
}
else if (5 == imode) //查询
{
while (true)
{
std::string s_input;
std::getline(std::cin, s_input);
if (s_input == "q" || s_input == "Q")
{
break;
}
QueryTest(s_input);
}
}
//lk.unlock();
char c;
cin >> c;
stringstream ss;
url = "http://localhost:8086/write";
ss << "mem,host=host1 used_percent=23.43234543 1556896326" << endl;
ss << "mem,host=host2 used_percent=26.81522361 1556896326" << endl;
ss << "mem,host=host1 used_percent=22.52984738 1556896336" << endl;
ss << "mem,host=host2 used_percent=27.18294630 1556896336" << endl;
string sData = ss.str();
//client.WriteData(url,"org_test","bucket_test", sData);
return 0;
}