blob: 58d4b4e7b20d6fa928381cf44a6f2d640a19673f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ROCKETMQ_CLIENT4CPP_EXAMPLE_COMMON_H_
#define ROCKETMQ_CLIENT4CPP_EXAMPLE_COMMON_H_
#include <atomic>
#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
#ifndef WIN32
#include "unistd.h"
#endif
#include "Arg_helper.h"
#include "DefaultMQProducer.h"
#include "DefaultMQPullConsumer.h"
#include "DefaultMQPushConsumer.h"
using namespace std;
std::atomic<int> g_msgCount(1);
class RocketmqSendAndConsumerArgs {
public:
RocketmqSendAndConsumerArgs()
: body("msgbody for test"),
thread_count(std::thread::hardware_concurrency()),
broadcasting(false),
syncpush(false),
SelectUnactiveBroker(false),
IsAutoDeleteSendCallback(false),
retrytimes(5),
PrintMoreInfo(false) {}
public:
std::string namesrv;
std::string namesrv_domain;
std::string groupname;
std::string topic;
std::string body;
int thread_count;
bool broadcasting;
bool syncpush;
bool SelectUnactiveBroker; // default select active broker
bool IsAutoDeleteSendCallback;
int retrytimes; // default retry 5 times;
bool PrintMoreInfo;
};
class TpsReportService {
public:
TpsReportService() : tps_interval_(1), quit_flag_(false), tps_count_(0) {}
void start() {
if (tps_thread_ == NULL) {
std::cout << "tps_thread_ is null" << std::endl;
return;
}
tps_thread_.reset(new std::thread(std::bind(&TpsReportService::TpsReport, this)));
}
~TpsReportService() {
quit_flag_.store(true);
if (tps_thread_ == NULL) {
std::cout << "tps_thread_ is null" << std::endl;
return;
}
if (tps_thread_->joinable())
tps_thread_->join();
}
void Increment() { ++tps_count_; }
void TpsReport() {
while (!quit_flag_.load()) {
std::this_thread::sleep_for(tps_interval_);
std::cout << "tps: " << tps_count_.load() << std::endl;
tps_count_.store(0);
}
}
private:
std::chrono::seconds tps_interval_;
std::shared_ptr<std::thread> tps_thread_;
std::atomic<bool> quit_flag_;
std::atomic<long> tps_count_;
};
void PrintPullResult(rocketmq::PullResult* result) {
std::cout << result->toString() << std::endl;
if (result->pullStatus == rocketmq::FOUND) {
std::cout << result->toString() << endl;
std::vector<rocketmq::MQMessageExt>::iterator it = result->msgFoundList.begin();
for (; it != result->msgFoundList.end(); ++it) {
cout << "=======================================================" << endl << (*it).toString() << endl;
}
}
}
static void PrintRocketmqSendAndConsumerArgs(const RocketmqSendAndConsumerArgs& info) {
std::cout << "nameserver: " << info.namesrv << endl
<< "topic: " << info.topic << endl
<< "groupname: " << info.groupname << endl
<< "produce content: " << info.body << endl
<< "msg count: " << g_msgCount.load() << endl
<< "thread count: " << info.thread_count << endl;
}
static void help() {
std::cout << "need option,like follow: \n"
<< "-n nameserver addr, if not set -n and -i ,no nameSrv will be got \n"
"-i nameserver domain name, if not set -n and -i ,no nameSrv will be "
"got \n"
"-g groupname \n"
"-t msg topic \n"
"-m messagecout(default value: 1) \n"
"-c content(default value: only test ) \n"
"-b (BROADCASTING model, default value: CLUSTER) \n"
"-s sync push(default is async push)\n"
"-r setup retry times(default value: 5 times)\n"
"-u select active broker to send msg(default value: false)\n"
"-d use AutoDeleteSendcallback by cpp client(defalut value: false) \n"
"-T thread count of send msg or consume msg(defalut value: system cpu "
"core number) \n"
"-v print more details information \n";
}
static bool ParseArgs(int argc, char* argv[], RocketmqSendAndConsumerArgs* info) {
#ifndef WIN32
int ch;
while ((ch = getopt(argc, argv, "n:i:g:t:m:c:b:s:h:r:T:bu")) != -1) {
switch (ch) {
case 'n':
info->namesrv.insert(0, optarg);
break;
case 'i':
info->namesrv_domain.insert(0, optarg);
break;
case 'g':
info->groupname.insert(0, optarg);
break;
case 't':
info->topic.insert(0, optarg);
break;
case 'm':
g_msgCount.store(atoi(optarg));
break;
case 'c':
info->body.insert(0, optarg);
break;
case 'b':
info->broadcasting = true;
break;
case 's':
info->syncpush = true;
break;
case 'r':
info->retrytimes = atoi(optarg);
break;
case 'u':
info->SelectUnactiveBroker = true;
break;
case 'T':
info->thread_count = atoi(optarg);
break;
case 'v':
info->PrintMoreInfo = true;
break;
case 'h':
help();
return false;
default:
help();
return false;
}
}
#else
rocketmq::Arg_helper arg_help(argc, argv);
info->namesrv = arg_help.get_option_value("-n");
info->namesrv_domain = arg_help.get_option_value("-i");
info->groupname = arg_help.get_option_value("-g");
info->topic = arg_help.get_option_value("-t");
info->broadcasting = atoi(arg_help.get_option_value("-b").c_str());
string msgContent(arg_help.get_option_value("-c"));
if (!msgContent.empty())
info->body = msgContent;
info->syncpush = atoi(arg_help.get_option_value("-s").c_str());
int retrytimes = atoi(arg_help.get_option_value("-r").c_str());
if (retrytimes > 0)
info->retrytimes = retrytimes;
info->SelectUnactiveBroker = atoi(arg_help.get_option_value("-u").c_str());
int thread_count = atoi(arg_help.get_option_value("-T").c_str());
if (thread_count > 0)
info->thread_count = thread_count;
info->PrintMoreInfo = atoi(arg_help.get_option_value("-v").c_str());
g_msgCount = atoi(arg_help.get_option_value("-m").c_str());
#endif
if (info->groupname.empty() || info->topic.empty() || (info->namesrv_domain.empty() && info->namesrv.empty())) {
std::cout << "please use -g to setup groupname and -t setup topic \n";
help();
return false;
}
return true;
}
#endif // ROCKETMQ_CLIENT4CPP_EXAMPLE_COMMON_H_