[improve] Optimize Gretimedb time-series statistics. (#3776)
Co-authored-by: Calvin <zhengqiwei@apache.org>
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
index a824111..adf4111 100644
--- a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
@@ -45,6 +45,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
@@ -68,8 +70,10 @@
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import org.springframework.util.MultiValueMap;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.UriComponents;
import org.springframework.web.util.UriComponentsBuilder;
/**
@@ -86,6 +90,8 @@
private static final String LABEL_KEY_FIELD = "__field__";
private static final String LABEL_KEY_INSTANCE = "instance";
private static final String LOG_TABLE_NAME = "hertzbeat_logs";
+ private static final String LABEL_KEY_START_TIME = "start";
+ private static final String LABEL_KEY_END_TIME = "end";
private GreptimeDB greptimeDb;
@@ -202,60 +208,148 @@
@Override
public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric,
String label, String history) {
+ Map<String, Long> timeRange = getTimeRange(history);
+ Long start = timeRange.get(LABEL_KEY_START_TIME);
+ Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+ String step = getTimeStep(start, end);
+
+ return getHistoryData(start, end, step, monitorId, app, metrics, metric);
+ }
+
+ private String getTableName(String metrics) {
+ return metrics;
+ }
+
+ @Override
+ public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics,
+ String metric, String label, String history) {
+ Map<String, Long> timeRange = getTimeRange(history);
+ Long start = timeRange.get(LABEL_KEY_START_TIME);
+ Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+ String step = getTimeStep(start, end);
+
+ Map<String, List<Value>> instanceValuesMap = getHistoryData(start, end, step, monitorId, app, metrics, metric);
+
+ // Queries below this point may yield inconsistent results due to exceeding the valid data range.
+ // Therefore, we restrict the valid range by obtaining the post-query timeframe.
+ // Since `gretime`'s `end` excludes the specified time, we add 4 hours.
+ List<Value> values = instanceValuesMap.get(instanceValuesMap.keySet().stream().toList().get(0));
+ // effective time
+ long effectiveStart = values.get(0).getTime() / 1000;
+ long effectiveEnd = values.get(values.size() - 1).getTime() / 1000 + Duration.ofHours(4).getSeconds();
+
+ String name = getTableName(metrics);
+ String timeSeriesSelector = name + "{" + LABEL_KEY_INSTANCE + "=\"" + monitorId + "\"";
+ if (!CommonConstants.PROMETHEUS.equals(app)) {
+ timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD + "=\"" + metric + "\"";
+ }
+ timeSeriesSelector = timeSeriesSelector + "}";
+
+ try {
+ // max
+ String finalTimeSeriesSelector = timeSeriesSelector;
+ URI uri = getUri(effectiveStart, effectiveEnd, step, uriComponents -> "max_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+ requestIntervalMetricAndPutValue(uri, instanceValuesMap, Value::setMax);
+ // min
+ uri = getUri(effectiveStart, effectiveEnd, step, uriComponents -> "min_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+ requestIntervalMetricAndPutValue(uri, instanceValuesMap, Value::setMin);
+ // avg
+ uri = getUri(effectiveStart, effectiveEnd, step, uriComponents -> "avg_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+ requestIntervalMetricAndPutValue(uri, instanceValuesMap, Value::setMean);
+ } catch (Exception e) {
+ log.error("query interval metrics data from greptime error. {}", e.getMessage(), e);
+ }
+
+ return instanceValuesMap;
+ }
+
+ /**
+ * Get time range
+ *
+ * @param history history range
+ * @return time range
+ */
+ private Map<String, Long> getTimeRange(String history) {
+ // Build start and end times
+ Instant now = Instant.now();
+ long start;
+ try {
+ if (NumberUtils.isParsable(history)) {
+ start = NumberUtils.toLong(history);
+ start = (ZonedDateTime.now().toEpochSecond() - start);
+ } else {
+ TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
+ assert temporalAmount != null;
+ Instant dateTime = now.minus(temporalAmount);
+ start = dateTime.getEpochSecond();
+ }
+ } catch (Exception e) {
+ log.error("history time error: {}. use default: 6h", e.getMessage());
+ start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
+ }
+ long end = now.getEpochSecond();
+ return Map.of("start", start, "end", end);
+ }
+
+ /**
+ * Get time step
+ *
+ * @param start start time
+ * @param end end time
+ * @return step
+ */
+ private String getTimeStep(long start, long end) {
+ // get step
+ String step = "60s";
+ if (end - start < Duration.ofDays(7).getSeconds() && end - start > Duration.ofDays(1).getSeconds()) {
+ step = "1h";
+ } else if (end - start >= Duration.ofDays(7).getSeconds()) {
+ step = "4h";
+ }
+ return step;
+ }
+
+ /**
+ * Get history metric data
+ *
+ * @param start start time
+ * @param end end time
+ * @param step step
+ * @param monitorId monitor id
+ * @param app monitor type
+ * @param metrics metrics
+ * @param metric metric
+ * @return history metric data
+ */
+ private Map<String, List<Value>> getHistoryData(long start, long end, String step, Long monitorId, String app, String metrics, String metric) {
String name = getTableName(metrics);
String timeSeriesSelector = LABEL_KEY_NAME + "=\"" + name + "\""
+ "," + LABEL_KEY_INSTANCE + "=\"" + monitorId + "\"";
if (!CommonConstants.PROMETHEUS.equals(app)) {
timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD + "=\"" + metric + "\"";
}
+
Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
try {
- HttpHeaders headers = new HttpHeaders();
- headers.setContentType(MediaType.APPLICATION_JSON);
- headers.setAccept(List.of(MediaType.APPLICATION_JSON));
- if (StringUtils.hasText(greptimeProperties.username())
- && StringUtils.hasText(greptimeProperties.password())) {
- String authStr = greptimeProperties.username() + ":" + greptimeProperties.password();
- String encodedAuth = Base64Util.encode(authStr);
- headers.add(HttpHeaders.AUTHORIZATION, BASIC + " " + encodedAuth);
- }
- Instant now = Instant.now();
- long start;
- try {
- if (NumberUtils.isParsable(history)) {
- start = NumberUtils.toLong(history);
- start = (ZonedDateTime.now().toEpochSecond() - start);
- } else {
- TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
- assert temporalAmount != null;
- Instant dateTime = now.minus(temporalAmount);
- start = dateTime.getEpochSecond();
+ HttpEntity<Void> httpEntity = getHttpEntity();
+
+ String finalTimeSeriesSelector = timeSeriesSelector;
+ URI uri = getUri(start, end, step, uriComponents -> {
+ MultiValueMap<String, String> queryParams = uriComponents.getQueryParams();
+ if (!queryParams.isEmpty()) {
+ return "{" + finalTimeSeriesSelector + "}";
}
- } catch (Exception e) {
- log.error("history time error: {}. use default: 6h", e.getMessage());
- start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
+ return null;
+ });
+
+ ResponseEntity<PromQlQueryContent> responseEntity = null;
+ if (uri != null) {
+ responseEntity = restTemplate.exchange(uri,
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
}
-
- long end = now.getEpochSecond();
- String step = "60s";
- if (end - start < Duration.ofDays(7).getSeconds() && end - start > Duration.ofDays(1).getSeconds()) {
- step = "1h";
- } else if (end - start >= Duration.ofDays(7).getSeconds()) {
- step = "4h";
- }
-
- HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
- URI uri = UriComponentsBuilder.fromUriString(greptimeProperties.httpEndpoint() + QUERY_RANGE_PATH)
- .queryParam(URLEncoder.encode("query", StandardCharsets.UTF_8), URLEncoder.encode("{" + timeSeriesSelector + "}", StandardCharsets.UTF_8))
- .queryParam("start", start)
- .queryParam("end", end)
- .queryParam("step", step)
- .queryParam("db", greptimeProperties.database())
- .build(true).toUri();
-
- ResponseEntity<PromQlQueryContent> responseEntity = restTemplate.exchange(uri,
- HttpMethod.GET, httpEntity, PromQlQueryContent.class);
- if (responseEntity.getStatusCode().is2xxSuccessful()) {
+ if (responseEntity != null && responseEntity.getStatusCode().is2xxSuccessful()) {
log.debug("query metrics data from greptime success. {}", uri);
if (responseEntity.getBody() != null && responseEntity.getBody().getData() != null
&& responseEntity.getBody().getData().getResult() != null) {
@@ -279,21 +373,101 @@
log.error("query metrics data from greptime failed. {}", responseEntity);
}
} catch (Exception e) {
- log.error(e.getMessage(), e);
+ log.error("query metrics data from greptime error. {}", e.getMessage(), e);
}
return instanceValuesMap;
}
- private String getTableName(String metrics) {
- return metrics;
+ /**
+ * Get HTTP instance
+ *
+ * @return HTTP instance
+ */
+ private HttpEntity<Void> getHttpEntity() {
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+ headers.setAccept(List.of(MediaType.APPLICATION_JSON));
+ if (StringUtils.hasText(greptimeProperties.username())
+ && StringUtils.hasText(greptimeProperties.password())) {
+ String authStr = greptimeProperties.username() + ":" + greptimeProperties.password();
+ String encodedAuth = Base64Util.encode(authStr);
+ headers.add(HttpHeaders.AUTHORIZATION, BASIC + " " + encodedAuth);
+ }
+ return new HttpEntity<>(headers);
}
- @Override
- public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics,
- String metric, String label, String history) {
- return getHistoryMetricData(monitorId, app, metrics, metric, label, history);
+ /**
+ * Get Request URI
+ *
+ * @param start start time
+ * @param end end time
+ * @param step interval
+ * @param queryFunction request parameters
+ * @return URI
+ */
+ private URI getUri(long start, long end, String step, Function<UriComponents, String> queryFunction) {
+ UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromUriString(greptimeProperties.httpEndpoint() + QUERY_RANGE_PATH)
+ .queryParam("start", start)
+ .queryParam("end", end)
+ .queryParam("step", step)
+ .queryParam("db", greptimeProperties.database());
+ UriComponents cloneUriComponents = uriComponentsBuilder.cloneBuilder().build(true);
+ String queryValue = queryFunction.apply(cloneUriComponents);
+ if (!StringUtils.hasText(queryValue)) {
+ return null;
+ }
+ UriComponents uriComponents = uriComponentsBuilder
+ .queryParam(
+ URLEncoder.encode("query", StandardCharsets.UTF_8),
+ URLEncoder.encode(queryValue, StandardCharsets.UTF_8)
+ ).build(true);
+ return uriComponents.toUri();
}
-
+
+ /**
+ * Request greptime and assign a value
+ *
+ * @param uri request URI
+ * @param instanceValuesMap metrics data
+ * @param valueConsumer consumer used for assigning values
+ */
+ private void requestIntervalMetricAndPutValue(URI uri, Map<String, List<Value>> instanceValuesMap, BiConsumer<Value, String> valueConsumer) {
+ if (uri == null) {
+ return;
+ }
+ HttpEntity<Void> httpEntity = getHttpEntity();
+ ResponseEntity<PromQlQueryContent> responseEntity = restTemplate.exchange(uri,
+ HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+ if (!responseEntity.getStatusCode().is2xxSuccessful()) {
+ log.error("query interval metrics data from greptime failed. {}", responseEntity);
+ return;
+ }
+ log.debug("query interval metrics data from greptime success. {}", uri);
+ PromQlQueryContent body = responseEntity.getBody();
+ if (body == null || body.getData() == null || body.getData().getResult() == null) {
+ return;
+ }
+ List<PromQlQueryContent.ContentData.Content> contents = body.getData().getResult();
+ for (PromQlQueryContent.ContentData.Content content : contents) {
+ Map<String, String> labels = content.getMetric();
+ labels.remove(LABEL_KEY_NAME);
+ labels.remove(LABEL_KEY_INSTANCE);
+ String labelStr = JsonUtil.toJson(labels);
+ if (content.getValues() == null || content.getValues().isEmpty()) {
+ continue;
+ }
+ List<Value> valueList = instanceValuesMap.computeIfAbsent(labelStr, k -> new LinkedList<>());
+ if (valueList.size() == content.getValues().size()) {
+ for (int timestampIndex = 0; timestampIndex < valueList.size(); timestampIndex++) {
+ Value value = valueList.get(timestampIndex);
+ Object[] valueArr = content.getValues().get(timestampIndex);
+ String avgValue = new BigDecimal(String.valueOf(valueArr[1])).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+ valueConsumer.accept(value, avgValue);
+ }
+ }
+ }
+ }
+
@Override
public void destroy() {
if (this.greptimeDb != null) {
@@ -359,14 +533,14 @@
}
@Override
- public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long endTime, String traceId,
- String spanId, Integer severityNumber,
+ public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long endTime, String traceId,
+ String spanId, Integer severityNumber,
String severityText) {
try {
StringBuilder sql = new StringBuilder("SELECT * FROM ").append(LOG_TABLE_NAME);
buildWhereConditions(sql, startTime, endTime, traceId, spanId, severityNumber, severityText);
sql.append(" ORDER BY time_unix_nano DESC");
-
+
List<Map<String, Object>> rows = greptimeSqlQueryExecutor.execute(sql.toString());
return mapRowsToLogEntries(rows);
} catch (Exception e) {
@@ -376,14 +550,14 @@
}
@Override
- public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long startTime, Long endTime, String traceId,
- String spanId, Integer severityNumber,
+ public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long startTime, Long endTime, String traceId,
+ String spanId, Integer severityNumber,
String severityText, Integer offset, Integer limit) {
try {
StringBuilder sql = new StringBuilder("SELECT * FROM ").append(LOG_TABLE_NAME);
buildWhereConditions(sql, startTime, endTime, traceId, spanId, severityNumber, severityText);
sql.append(" ORDER BY time_unix_nano DESC");
-
+
// Add pagination
if (limit != null && limit > 0) {
sql.append(" LIMIT ").append(limit);
@@ -391,7 +565,7 @@
sql.append(" OFFSET ").append(offset);
}
}
-
+
List<Map<String, Object>> rows = greptimeSqlQueryExecutor.execute(sql.toString());
return mapRowsToLogEntries(rows);
} catch (Exception e) {
@@ -401,13 +575,13 @@
}
@Override
- public long countLogsByMultipleConditions(Long startTime, Long endTime, String traceId,
- String spanId, Integer severityNumber,
+ public long countLogsByMultipleConditions(Long startTime, Long endTime, String traceId,
+ String spanId, Integer severityNumber,
String severityText) {
try {
StringBuilder sql = new StringBuilder("SELECT COUNT(*) as count FROM ").append(LOG_TABLE_NAME);
buildWhereConditions(sql, startTime, endTime, traceId, spanId, severityNumber, severityText);
-
+
List<Map<String, Object>> rows = greptimeSqlQueryExecutor.execute(sql.toString());
if (rows != null && !rows.isEmpty()) {
Object countObj = rows.get(0).get("count");
@@ -442,35 +616,35 @@
* @param spanId span id
* @param severityNumber severity number
*/
- private void buildWhereConditions(StringBuilder sql, Long startTime, Long endTime, String traceId,
+ private void buildWhereConditions(StringBuilder sql, Long startTime, Long endTime, String traceId,
String spanId, Integer severityNumber, String severityText) {
List<String> conditions = new ArrayList<>();
-
+
// Time range condition
if (startTime != null && endTime != null) {
conditions.add("time_unix_nano >= " + msToNs(startTime) + " AND time_unix_nano <= " + msToNs(endTime));
}
-
+
// TraceId condition
if (StringUtils.hasText(traceId)) {
conditions.add("trace_id = '" + safeString(traceId) + "'");
}
-
+
// SpanId condition
if (StringUtils.hasText(spanId)) {
conditions.add("span_id = '" + safeString(spanId) + "'");
}
-
+
// Severity condition
if (severityNumber != null) {
conditions.add("severity_number = " + severityNumber);
}
-
+
// SeverityText condition
if (StringUtils.hasText(severityText)) {
conditions.add("severity_text = '" + safeString(severityText) + "'");
}
-
+
// Add WHERE clause if there are conditions
if (!conditions.isEmpty()) {
sql.append(" WHERE ").append(String.join(" AND ", conditions));
@@ -582,11 +756,11 @@
.map(String::valueOf)
.collect(Collectors.joining(", ")));
sql.append(")");
-
+
greptimeSqlQueryExecutor.execute(sql.toString());
log.info("[warehouse greptime-log] Batch delete executed successfully for {} logs", timeUnixNanos.size());
return true;
-
+
} catch (Exception e) {
log.error("[warehouse greptime-log] batchDeleteLogs error: {}", e.getMessage(), e);
return false;