blob: 2b96f52463a3a42572cd629585b49c21abd36d1f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "flowctrl_def.h"
#include <stdio.h>
#include <time.h>
#include <unistd.h>
#include <sstream>
#include "const_config.h"
#include "logger.h"
#include "utils.h"
namespace tubemq {
using std::stringstream;
using std::lock_guard;
FlowCtrlResult::FlowCtrlResult() {
datasize_limit_ = tb_config::kMaxIntValue;
freqms_limit_ = 0;
}
FlowCtrlResult::FlowCtrlResult(int64_t datasize_limit, int32_t freqms_limit) {
datasize_limit_ = datasize_limit;
freqms_limit_ = freqms_limit;
}
FlowCtrlResult& FlowCtrlResult::operator=(const FlowCtrlResult& target) {
if (this == &target) return *this;
datasize_limit_ = target.datasize_limit_;
freqms_limit_ = target.freqms_limit_;
return *this;
}
void FlowCtrlResult::SetDataDltAndFreqLimit(int64_t datasize_limit, int32_t freqms_limit) {
datasize_limit_ = datasize_limit;
freqms_limit_ = freqms_limit;
}
void FlowCtrlResult::SetDataSizeLimit(int64_t datasize_limit) {
datasize_limit_ = datasize_limit;
}
void FlowCtrlResult::SetFreqMsLimit(int32_t freqms_limit) { freqms_limit_ = freqms_limit; }
int64_t FlowCtrlResult::GetDataSizeLimit() { return datasize_limit_; }
int32_t FlowCtrlResult::GetFreqMsLimit() { return freqms_limit_; }
FlowCtrlItem::FlowCtrlItem() {
type_ = 0;
start_time_ = 2500;
end_time_ = tb_config::kInvalidValue;
datadlt_m_ = tb_config::kInvalidValue;
datasize_limit_ = tb_config::kInvalidValue;
freqms_limit_ = tb_config::kInvalidValue;
zero_cnt_ = tb_config::kInvalidValue;
}
FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t zero_cnt, int32_t freqms_limit) {
type_ = type;
start_time_ = 2500;
end_time_ = tb_config::kInvalidValue;
datadlt_m_ = tb_config::kInvalidValue;
datasize_limit_ = tb_config::kInvalidValue;
freqms_limit_ = freqms_limit;
zero_cnt_ = zero_cnt;
}
FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
int32_t min_data_filter_freqms) {
type_ = type;
start_time_ = 2500;
end_time_ = tb_config::kInvalidValue;
datadlt_m_ = tb_config::kInvalidValue;
datasize_limit_ = datasize_limit;
freqms_limit_ = freqms_limit;
zero_cnt_ = min_data_filter_freqms;
}
FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t start_time, int32_t end_time, int64_t datadlt_m,
int64_t datasize_limit, int32_t freqms_limit) {
type_ = type;
start_time_ = start_time;
end_time_ = end_time;
datadlt_m_ = datadlt_m;
datasize_limit_ = datasize_limit;
freqms_limit_ = freqms_limit;
zero_cnt_ = tb_config::kInvalidValue;
}
FlowCtrlItem& FlowCtrlItem::operator=(const FlowCtrlItem& target) {
if (this == &target) return *this;
type_ = target.type_;
start_time_ = target.start_time_;
end_time_ = target.end_time_;
datadlt_m_ = target.datadlt_m_;
datasize_limit_ = target.datasize_limit_;
freqms_limit_ = target.freqms_limit_;
zero_cnt_ = target.zero_cnt_;
return *this;
}
int32_t FlowCtrlItem::GetFreLimit(int32_t msg_zero_cnt) const {
if (type_ != 1) {
return -1;
}
if (msg_zero_cnt >= zero_cnt_) {
return freqms_limit_;
}
return -1;
}
void FlowCtrlItem::ResetFlowCtrlValue(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
int32_t min_data_filter_freqms) {
type_ = type;
start_time_ = 2500;
end_time_ = tb_config::kInvalidValue;
datadlt_m_ = tb_config::kInvalidValue;
datasize_limit_ = datasize_limit;
freqms_limit_ = freqms_limit;
zero_cnt_ = min_data_filter_freqms;
}
void FlowCtrlItem::Clear() {
type_ = 0;
start_time_ = 2500;
end_time_ = tb_config::kInvalidValue;
datadlt_m_ = tb_config::kInvalidValue;
datasize_limit_ = tb_config::kInvalidValue;
freqms_limit_ = tb_config::kInvalidValue;
zero_cnt_ = tb_config::kInvalidValue;
}
bool FlowCtrlItem::GetDataLimit(int64_t datadlt_m, int32_t curr_time,
FlowCtrlResult& flowctrl_result) const {
if (type_ != 0 || datadlt_m <= datadlt_m_) {
return false;
}
if (curr_time < start_time_ || curr_time > end_time_) {
return false;
}
flowctrl_result.SetDataDltAndFreqLimit(datasize_limit_, freqms_limit_);
return true;
}
FlowCtrlRuleHandler::FlowCtrlRuleHandler() {
flowctrl_id_.GetAndSet(tb_config::kInvalidValue);
flowctrl_info_ = "";
min_zero_cnt_.Set(tb_config::kMaxIntValue);
qrypriority_id_.Set(tb_config::kInvalidValue);
min_datadlt_limt_.Set(tb_config::kMaxLongValue);
datalimit_start_time_.Set(2500);
datalimit_end_time_.Set(tb_config::kInvalidValue);
last_update_time_ = Utils::GetCurrentTimeMillis();
}
FlowCtrlRuleHandler::~FlowCtrlRuleHandler() {
//
}
void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id,
int64_t flowctrl_id, const string& flowctrl_info) {
map<int32_t, vector<FlowCtrlItem> > tmp_flowctrl_map;
if (flowctrl_id == flowctrl_id_.Get()) {
return;
}
int64_t curr_flowctrl_id = flowctrl_id_.Get();
if (flowctrl_info.length() > 0) {
parseFlowCtrlInfo(flowctrl_info, tmp_flowctrl_map);
}
lock_guard<mutex> lck(config_lock_);
flowctrl_id_.Set(flowctrl_id);
qrypriority_id_.Set(qrypriority_id);
clearStatisData();
if (tmp_flowctrl_map.empty()) {
flowctrl_rules_.clear();
flowctrl_info_ = "";
} else {
flowctrl_rules_ = tmp_flowctrl_map;
flowctrl_info_ = flowctrl_info;
initialStatisData();
}
last_update_time_ = Utils::GetCurrentTimeMillis();
if (is_default) {
LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
flowctrl_id);
} else {
LOG_INFO("[Flow Ctrl] Group FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
flowctrl_id);
}
return;
}
void FlowCtrlRuleHandler::initialStatisData() {
vector<FlowCtrlItem>::iterator it_vec;
map<int, vector<FlowCtrlItem> >::iterator it_map;
it_map = flowctrl_rules_.find(0);
if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetType() != 0) {
continue;
}
if (it_vec->GetDltInM() < min_datadlt_limt_.Get()) {
min_datadlt_limt_.Set(it_vec->GetDltInM());
}
if (it_vec->GetStartTime() < datalimit_start_time_.Get()) {
datalimit_start_time_.Set(it_vec->GetStartTime());
}
if (it_vec->GetEndTime() > datalimit_end_time_.Get()) {
datalimit_end_time_.Set(it_vec->GetEndTime());
}
}
}
it_map = flowctrl_rules_.find(1);
if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetType() != 1) {
continue;
}
if (it_vec->GetZeroCnt() < min_zero_cnt_.Get()) {
min_zero_cnt_.Set(it_vec->GetZeroCnt());
}
}
}
it_map = flowctrl_rules_.find(3);
if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetType() != 3) {
continue;
}
it_vec->GetDataSizeLimit();
filter_ctrl_item_.ResetFlowCtrlValue(3, (int32_t)(it_vec->GetDataSizeLimit()),
it_vec->GetFreqMsLimit(), it_vec->GetZeroCnt());
}
}
}
void FlowCtrlRuleHandler::clearStatisData() {
min_zero_cnt_.GetAndSet(tb_config::kMaxIntValue);
min_datadlt_limt_.GetAndSet(tb_config::kMaxLongValue);
qrypriority_id_.Set(tb_config::kInvalidValue);
datalimit_start_time_.Set(2500);
datalimit_end_time_.Set(tb_config::kInvalidValue);
filter_ctrl_item_.Clear();
}
bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt,
FlowCtrlResult& flowctrl_result) const {
struct tm utc_tm;
bool result = false;
vector<FlowCtrlItem>::const_iterator it_vec;
map<int, vector<FlowCtrlItem> >::const_iterator it_map;
// get current data limit
time_t cur_time = time(NULL);
gmtime_r(&cur_time, &utc_tm);
int curr_time = (utc_tm.tm_hour + 8) % 24 * 100 + utc_tm.tm_min;
if ((last_datadlt < min_datadlt_limt_.Get())
|| (curr_time < datalimit_start_time_.Get())
|| (curr_time > datalimit_end_time_.Get())) {
return false;
}
// search total flowctrl rule
lock_guard<mutex> lck(config_lock_);
it_map = flowctrl_rules_.find(0);
if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
result = true;
break;
}
}
}
return result;
}
int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt,
int32_t received_limit) const {
int32_t limit_data = received_limit;
vector<FlowCtrlItem>::const_iterator it_vec;
map<int, vector<FlowCtrlItem> >::const_iterator it_map;
// check min zero count
if (msg_zero_cnt < min_zero_cnt_.Get()) {
return limit_data;
}
// search rule allow value
lock_guard<mutex> lck(config_lock_);
it_map = flowctrl_rules_.find(1);
if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
limit_data = it_vec->GetFreLimit(msg_zero_cnt);
if (limit_data >= 0) {
break;
}
}
}
return limit_data;
}
void FlowCtrlRuleHandler::GetFilterCtrlItem(FlowCtrlItem& result) const {
result.Clear();
lock_guard<mutex> lck(config_lock_);
result = filter_ctrl_item_;
}
void FlowCtrlRuleHandler::GetFlowCtrlInfo(string& flowctrl_info) const {
flowctrl_info.clear();
lock_guard<mutex> lck(config_lock_);
flowctrl_info = flowctrl_info_;
}
bool FlowCtrlRuleHandler::compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2) {
if (o1.GetStartTime() >= o2.GetStartTime()) {
return true;
}
return false;
}
bool FlowCtrlRuleHandler::compareFeqQueue(const FlowCtrlItem& queue1, const FlowCtrlItem& queue2) {
return (queue1.GetZeroCnt() < queue2.GetZeroCnt());
}
bool FlowCtrlRuleHandler::parseFlowCtrlInfo(
const string& flowctrl_info, map<int32_t, vector<FlowCtrlItem> >& flowctrl_info_map) {
int32_t type;
string err_info;
stringstream ss;
rapidjson::Document doc;
// check flowctrl info length
if (flowctrl_info.length() == 0) {
return false;
}
// parse flowctrl info
if (doc.Parse(flowctrl_info.c_str()).HasParseError()) {
LOG_ERROR("Parsing flowCtrlInfo failure! flowctrl_info=%s\n", flowctrl_info.c_str());
return false;
}
if (!doc.IsArray()) {
LOG_ERROR("flowCtrlInfo's value must be array! flowctrl_info=%s\n", flowctrl_info.c_str());
return false;
}
for (uint32_t i = 0; i < doc.Size(); i++) {
vector<FlowCtrlItem> flowctrl_item_vec;
const rapidjson::Value& node_item = doc[i];
if (!node_item.IsObject()) {
continue;
}
if (!parseIntMember(err_info, node_item, "type", type, false, -2)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of type field in parse flowctrl_info!";
err_info = ss.str();
LOG_ERROR("parse flowCtrlInfo failure %s", err_info.c_str());
return false;
}
if (type < 0 || type > 3) {
ss << "type value must in [0,1,2,3] in index(";
ss << i;
ss << ") of flowctrl_info value!";
err_info = ss.str();
LOG_ERROR("parse flowCtrlInfo failure %s", err_info.c_str());
return false;
}
switch (type) {
case 1: {
if (FlowCtrlRuleHandler::parseFreqLimit(err_info, node_item, flowctrl_item_vec)) {
flowctrl_info_map[1] = flowctrl_item_vec;
} else {
LOG_ERROR("parse flowCtrlInfo's freqLimit failure: %s", err_info.c_str());
}
} break;
case 3: {
if (FlowCtrlRuleHandler::parseLowFetchLimit(err_info, node_item, flowctrl_item_vec)) {
flowctrl_info_map[3] = flowctrl_item_vec;
} else {
LOG_ERROR("parse flowCtrlInfo's lowFetchLimit failure: %s", err_info.c_str());
}
} break;
case 0: {
if (FlowCtrlRuleHandler::parseDataLimit(err_info, node_item, flowctrl_item_vec)) {
flowctrl_info_map[0] = flowctrl_item_vec;
} else {
LOG_ERROR("parse flowCtrlInfo's dataLimit failure: %s", err_info.c_str());
}
} break;
default:
break;
}
}
return true;
}
bool FlowCtrlRuleHandler::parseDataLimit(string& err_info, const rapidjson::Value& root,
vector<FlowCtrlItem>& flowctrl_items) {
int32_t type_val;
stringstream ss;
string attr_sep = delimiter::kDelimiterColon;
if (!parseIntMember(err_info, root, "type", type_val, true, 0)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of type field in parse data limit!";
err_info = ss.str();
return false;
}
// check rule type
if (!root.HasMember("rule")) {
err_info = "rule field not existed";
return false;
}
if (!root["rule"].IsArray()) {
err_info = "Illegal value, rule must be list type";
return false;
}
// parse rule info
const rapidjson::Value& obj_set = root["rule"];
for (uint32_t index = 0; index < obj_set.Size(); index++) {
int32_t start_time = 0;
int32_t end_time = 0;
int64_t datadlt_m = 0;
int64_t datasize_limit = 0;
int32_t freqms_limit = 0;
const rapidjson::Value& node_item = obj_set[index];
if (!node_item.IsObject()) {
err_info = "Illegal rule'value item, must be dict type";
return false;
}
if (!parseTimeMember(err_info, node_item, "start", start_time)) {
return false;
}
if (!parseTimeMember(err_info, node_item, "end", end_time)) {
return false;
}
if (start_time > end_time) {
ss << "start value must lower than the End value in index(";
ss << index;
ss << ") of data limit rule!";
err_info = ss.str();
return false;
}
if (!parseLongMember(err_info, node_item, "dltInM", datadlt_m, false, -1)) {
ss << "dltInM key is required in index(";
ss << index;
ss << ") of data limit rule!";
err_info = ss.str();
return false;
}
if (datadlt_m <= 20) {
ss << "dltInM value must over than 20 in index(";
ss << index;
ss << ") of data limit rule!";
err_info = ss.str();
return false;
}
datadlt_m = datadlt_m * 1024 * 1024;
if (!parseLongMember(err_info, node_item, "limitInM", datasize_limit, false, -1)) {
ss << "limitInM key is required in index(";
ss << index;
ss << ") of data limit rule!";
err_info = ss.str();
return false;
}
if (datasize_limit < 0) {
ss << "limitInM value must over than equal or bigger than zero in index(";
ss << index;
ss << ") of data limit rule!";
err_info = ss.str();
return false;
}
datasize_limit = datasize_limit * 1024 * 1024;
if (!parseIntMember(err_info, node_item, "freqInMs", freqms_limit, false, -1)) {
ss << "freqInMs key is required in index(";
ss << index;
ss << ") of data limit rule!";
err_info = ss.str();
return false;
}
if (freqms_limit < 200) {
ss << "freqInMs value must over than equal or bigger than 200 in index(";
ss << index;
ss << ") of data limit rule!";
err_info = ss.str();
return false;
}
FlowCtrlItem flowctrl_item(0, start_time, end_time, datadlt_m, datasize_limit, freqms_limit);
flowctrl_items.push_back(flowctrl_item);
}
if (!flowctrl_items.empty()) {
std::sort(flowctrl_items.begin(), flowctrl_items.end(), compareDataLimitQueue);
}
err_info = "Ok";
return true;
}
bool FlowCtrlRuleHandler::parseFreqLimit(string& err_info, const rapidjson::Value& root,
vector<FlowCtrlItem>& flowctrl_items) {
int32_t type_val;
stringstream ss;
if (!parseIntMember(err_info, root, "type", type_val, true, 1)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of type field in parse freq limit!";
err_info = ss.str();
return false;
}
if (!root.HasMember("rule")) {
err_info = "rule field not existed";
return false;
}
if (!root["rule"].IsArray()) {
err_info = "Illegal value, rule must be list type";
return false;
}
// parse rule info
const rapidjson::Value& obj_set = root["rule"];
for (uint32_t i = 0; i < obj_set.Size(); i++) {
int32_t zeroCnt = -2;
int32_t freqms_limit = -2;
const rapidjson::Value& node_item = obj_set[i];
if (!node_item.IsObject()) {
err_info = "Illegal rule'value item, must be dict type";
return false;
}
if (!parseIntMember(err_info, node_item, "zeroCnt", zeroCnt, false, -2)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of zeroCnt field in parse freq limit!";
err_info = ss.str();
return false;
}
if (!parseIntMember(err_info, node_item, "freqInMs", freqms_limit, false, -2)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of freqInMs field in parse freq limit!";
err_info = ss.str();
return false;
}
FlowCtrlItem flowctrl_item(1, zeroCnt, freqms_limit);
flowctrl_items.push_back(flowctrl_item);
}
if (!flowctrl_items.empty()) {
std::sort(flowctrl_items.begin(), flowctrl_items.end(), compareFeqQueue);
}
err_info = "Ok";
return true;
}
bool FlowCtrlRuleHandler::parseLowFetchLimit(string& err_info, const rapidjson::Value& root,
vector<FlowCtrlItem>& flowctrl_items) {
int32_t type_val;
stringstream ss;
if (!parseIntMember(err_info, root, "type", type_val, true, 3)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of type field in parse low fetch limit!";
err_info = ss.str();
return false;
}
if (!root.HasMember("rule")) {
err_info = "rule field not existed";
return false;
}
if (!root["rule"].IsArray()) {
err_info = "Illegal value, rule must be list type";
return false;
}
// parse rule info
const rapidjson::Value& nodes = root["rule"];
for (uint32_t i = 0; i < nodes.Size(); ++i) {
int32_t norm_freq_ms = 0;
int32_t filter_freq_ms = 0;
int32_t min_filter_freq_ms = 0;
FlowCtrlItem flowctrl_item;
const rapidjson::Value& node_item = nodes[i];
if (!node_item.IsObject()) {
err_info = "Illegal rule'value item, must be dict type";
return false;
}
if (node_item.HasMember("filterFreqInMs") || node_item.HasMember("minDataFilterFreqInMs")) {
if (!parseIntMember(err_info, node_item, "filterFreqInMs", filter_freq_ms, false, -1)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of filterFreqInMs field in parse low fetch limit!";
err_info = ss.str();
return false;
}
if (!parseIntMember(err_info, node_item, "minDataFilterFreqInMs", min_filter_freq_ms, false,
-1)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of minDataFilterFreqInMs field in parse low fetch limit!";
err_info = ss.str();
return false;
}
if (filter_freq_ms < 0 || filter_freq_ms > 300000) {
ss << "Decode Failure: ";
ss << "filterFreqInMs value must in [0, 300000] in index(";
ss << i;
ss << ") of low fetch limit rule!";
err_info = ss.str();
return false;
}
if (min_filter_freq_ms < 0 || min_filter_freq_ms > 300000) {
ss << "Decode Failure: ";
ss << "minDataFilterFreqInMs value must in [0, 300000] in index(";
ss << i;
ss << ") of low fetch limit rule!";
err_info = ss.str();
return false;
}
if (min_filter_freq_ms < filter_freq_ms) {
ss << "Decode Failure: ";
ss << "minDataFilterFreqInMs must lower than filterFreqInMs in index(";
ss << i;
ss << ") of low fetch limit rule!";
err_info = ss.str();
return false;
}
}
if (node_item.HasMember("normFreqInMs")) {
if (!parseIntMember(err_info, node_item, "normFreqInMs", norm_freq_ms, false, -1)) {
ss << "Decode Failure: ";
ss << err_info;
ss << " of normFreqInMs field in parse low fetch limit!";
err_info = ss.str();
return false;
}
if (norm_freq_ms < 0 || norm_freq_ms > 300000) {
ss << "Decode Failure: ";
ss << "normFreqInMs value must in [0, 300000] in index(";
ss << i;
ss << ") of low fetch limit rule!";
err_info = ss.str();
return false;
}
}
flowctrl_item.ResetFlowCtrlValue(3, norm_freq_ms, filter_freq_ms, min_filter_freq_ms);
flowctrl_items.push_back(flowctrl_item);
}
err_info = "Ok";
return true;
}
bool FlowCtrlRuleHandler::parseStringMember(string& err_info, const rapidjson::Value& root,
const char* key, string& value, bool compare_value,
string required_val) {
// check key if exist
if (!root.HasMember(key)) {
err_info = "Field not existed";
return false;
}
if (!root[key].IsString()) {
err_info = "Illegal value, must be string type";
return false;
}
if (compare_value) {
if (root[key].GetString() != required_val) {
err_info = "Illegal value, not required value content";
return false;
}
}
value = root[key].GetString();
return true;
}
bool FlowCtrlRuleHandler::parseLongMember(string& err_info, const rapidjson::Value& root,
const char* key, int64_t& value, bool compare_value,
int64_t required_val) {
if (!root.HasMember(key)) {
err_info = "Field not existed";
return false;
}
if (!root[key].IsNumber()) {
err_info = "Illegal value, must be number type";
return false;
}
if (compare_value) {
if ((int64_t)root[key].GetInt64() != required_val) {
err_info = "Illegal value, not required value content";
return false;
}
}
value = (int64_t)root[key].GetInt64();
return true;
}
bool FlowCtrlRuleHandler::parseIntMember(string& err_info, const rapidjson::Value& root,
const char* key, int32_t& value, bool compare_value,
int32_t required_val) {
if (!root.HasMember(key)) {
err_info = "Field not existed";
return false;
}
if (!root[key].IsInt()) {
err_info = "Illegal value, must be int type";
return false;
}
if (compare_value) {
if (root[key].GetInt() != required_val) {
err_info = "Illegal value, not required value content";
return false;
}
}
value = root[key].GetInt();
return true;
}
bool FlowCtrlRuleHandler::parseTimeMember(string& err_info, const rapidjson::Value& root,
const char* key, int32_t& value) {
// check key if exist
stringstream ss;
if (!root.HasMember(key)) {
ss << "field ";
ss << key;
ss << " not existed!";
err_info = ss.str();
return false;
}
if (!root[key].IsString()) {
ss << "field ";
ss << key;
ss << " must be string type!";
err_info = ss.str();
return false;
}
string::size_type pos1;
string str_value = root[key].GetString();
string attr_sep = delimiter::kDelimiterColon;
pos1 = str_value.find(attr_sep);
if (string::npos == pos1) {
ss << "field ";
ss << key;
ss << " must be 'aa:bb' and 'aa','bb' must be int value format!";
err_info = ss.str();
return false;
}
string sub_str_1 = str_value.substr(0, pos1);
string sub_str_2 = str_value.substr(pos1 + attr_sep.size(), str_value.size());
int32_t in_hour = atoi(sub_str_1.c_str());
int32_t in_minute = atoi(sub_str_2.c_str());
if (in_hour < 0 || in_hour > 24) {
ss << "field ";
ss << key;
ss << " -hour value must in [0,23]!";
err_info = ss.str();
return false;
}
if (in_minute < 0 || in_minute > 59) {
ss << "field ";
ss << key;
ss << " -minute value must in [0,59]!";
err_info = ss.str();
return false;
}
value = in_hour * 100 + in_minute;
return true;
}
} // namespace tubemq