[improve] Optimize Gretimedb time-series statistics. (#3776)

Co-authored-by: Calvin <zhengqiwei@apache.org>
diff --git a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
index a824111..adf4111 100644
--- a/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
+++ b/hertzbeat-warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/tsdb/greptime/GreptimeDbDataStorage.java
@@ -45,6 +45,8 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import lombok.extern.slf4j.Slf4j;
@@ -68,8 +70,10 @@
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.stereotype.Component;
+import org.springframework.util.MultiValueMap;
 import org.springframework.util.StringUtils;
 import org.springframework.web.client.RestTemplate;
+import org.springframework.web.util.UriComponents;
 import org.springframework.web.util.UriComponentsBuilder;
 
 /**
@@ -86,6 +90,8 @@
     private static final String LABEL_KEY_FIELD = "__field__";
     private static final String LABEL_KEY_INSTANCE = "instance";
     private static final String LOG_TABLE_NAME = "hertzbeat_logs";
+    private static final String LABEL_KEY_START_TIME = "start";
+    private static final String LABEL_KEY_END_TIME = "end";
 
     private GreptimeDB greptimeDb;
 
@@ -202,60 +208,148 @@
     @Override
     public Map<String, List<Value>> getHistoryMetricData(Long monitorId, String app, String metrics, String metric,
                                                          String label, String history) {
+        Map<String, Long> timeRange = getTimeRange(history);
+        Long start = timeRange.get(LABEL_KEY_START_TIME);
+        Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+        String step = getTimeStep(start, end);
+
+        return getHistoryData(start, end, step, monitorId, app, metrics, metric);
+    }
+
+    private String getTableName(String metrics) {
+        return metrics;
+    }
+
+    @Override
+    public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics,
+                                                                 String metric, String label, String history) {
+        Map<String, Long> timeRange = getTimeRange(history);
+        Long start = timeRange.get(LABEL_KEY_START_TIME);
+        Long end = timeRange.get(LABEL_KEY_END_TIME);
+
+        String step = getTimeStep(start, end);
+
+        Map<String, List<Value>> instanceValuesMap = getHistoryData(start, end, step, monitorId, app, metrics, metric);
+
+        // Queries below this point may yield inconsistent results due to exceeding the valid data range.
+        // Therefore, we restrict the valid range by obtaining the post-query timeframe.
+        // Since `gretime`'s `end` excludes the specified time, we add 4 hours.
+        List<Value> values = instanceValuesMap.get(instanceValuesMap.keySet().stream().toList().get(0));
+        // effective time
+        long effectiveStart = values.get(0).getTime() / 1000;
+        long effectiveEnd = values.get(values.size() - 1).getTime() / 1000 + Duration.ofHours(4).getSeconds();
+
+        String name = getTableName(metrics);
+        String timeSeriesSelector = name + "{" + LABEL_KEY_INSTANCE + "=\"" + monitorId + "\"";
+        if (!CommonConstants.PROMETHEUS.equals(app)) {
+            timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD + "=\"" + metric + "\"";
+        }
+        timeSeriesSelector = timeSeriesSelector + "}";
+
+        try {
+            // max
+            String finalTimeSeriesSelector = timeSeriesSelector;
+            URI uri = getUri(effectiveStart, effectiveEnd, step, uriComponents -> "max_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+            requestIntervalMetricAndPutValue(uri, instanceValuesMap, Value::setMax);
+            // min
+            uri = getUri(effectiveStart, effectiveEnd, step, uriComponents -> "min_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+            requestIntervalMetricAndPutValue(uri, instanceValuesMap, Value::setMin);
+            // avg
+            uri = getUri(effectiveStart, effectiveEnd, step, uriComponents -> "avg_over_time(" + finalTimeSeriesSelector + "[" + step + "])");
+            requestIntervalMetricAndPutValue(uri, instanceValuesMap, Value::setMean);
+        } catch (Exception e) {
+            log.error("query interval metrics data from greptime error. {}", e.getMessage(), e);
+        }
+
+        return instanceValuesMap;
+    }
+
+    /**
+     * Get time range
+     *
+     * @param history history range
+     * @return time range
+     */
+    private Map<String, Long> getTimeRange(String history) {
+        // Build start and end times
+        Instant now = Instant.now();
+        long start;
+        try {
+            if (NumberUtils.isParsable(history)) {
+                start = NumberUtils.toLong(history);
+                start = (ZonedDateTime.now().toEpochSecond() - start);
+            } else {
+                TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
+                assert temporalAmount != null;
+                Instant dateTime = now.minus(temporalAmount);
+                start = dateTime.getEpochSecond();
+            }
+        } catch (Exception e) {
+            log.error("history time error: {}. use default: 6h", e.getMessage());
+            start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
+        }
+        long end = now.getEpochSecond();
+        return Map.of("start", start, "end", end);
+    }
+
+    /**
+     * Get time step
+     *
+     * @param start start time
+     * @param end   end time
+     * @return step
+     */
+    private String getTimeStep(long start, long end) {
+        // get step
+        String step = "60s";
+        if (end - start < Duration.ofDays(7).getSeconds() && end - start > Duration.ofDays(1).getSeconds()) {
+            step = "1h";
+        } else if (end - start >= Duration.ofDays(7).getSeconds()) {
+            step = "4h";
+        }
+        return step;
+    }
+
+    /**
+     * Get history metric data
+     *
+     * @param start     start time
+     * @param end       end time
+     * @param step      step
+     * @param monitorId monitor id
+     * @param app       monitor type
+     * @param metrics   metrics
+     * @param metric    metric
+     * @return history metric data
+     */
+    private Map<String, List<Value>> getHistoryData(long start, long end, String step, Long monitorId, String app, String metrics, String metric) {
         String name = getTableName(metrics);
         String timeSeriesSelector = LABEL_KEY_NAME + "=\"" + name + "\""
                 + "," + LABEL_KEY_INSTANCE + "=\"" + monitorId + "\"";
         if (!CommonConstants.PROMETHEUS.equals(app)) {
             timeSeriesSelector = timeSeriesSelector + "," + LABEL_KEY_FIELD + "=\"" + metric + "\"";
         }
+
         Map<String, List<Value>> instanceValuesMap = new HashMap<>(8);
         try {
-            HttpHeaders headers = new HttpHeaders();
-            headers.setContentType(MediaType.APPLICATION_JSON);
-            headers.setAccept(List.of(MediaType.APPLICATION_JSON));
-            if (StringUtils.hasText(greptimeProperties.username())
-                    && StringUtils.hasText(greptimeProperties.password())) {
-                String authStr = greptimeProperties.username() + ":" + greptimeProperties.password();
-                String encodedAuth = Base64Util.encode(authStr);
-                headers.add(HttpHeaders.AUTHORIZATION, BASIC + " " + encodedAuth);
-            }
-            Instant now = Instant.now();
-            long start;
-            try {
-                if (NumberUtils.isParsable(history)) {
-                    start = NumberUtils.toLong(history);
-                    start = (ZonedDateTime.now().toEpochSecond() - start);
-                } else {
-                    TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history);
-                    assert temporalAmount != null;
-                    Instant dateTime = now.minus(temporalAmount);
-                    start = dateTime.getEpochSecond();
+            HttpEntity<Void> httpEntity = getHttpEntity();
+
+            String finalTimeSeriesSelector = timeSeriesSelector;
+            URI uri = getUri(start, end, step, uriComponents -> {
+                MultiValueMap<String, String> queryParams = uriComponents.getQueryParams();
+                if (!queryParams.isEmpty()) {
+                    return "{" + finalTimeSeriesSelector + "}";
                 }
-            } catch (Exception e) {
-                log.error("history time error: {}. use default: 6h", e.getMessage());
-                start = now.minus(6, ChronoUnit.HOURS).getEpochSecond();
+                return null;
+            });
+
+            ResponseEntity<PromQlQueryContent> responseEntity = null;
+            if (uri != null) {
+                responseEntity = restTemplate.exchange(uri,
+                        HttpMethod.GET, httpEntity, PromQlQueryContent.class);
             }
-
-            long end = now.getEpochSecond();
-            String step = "60s";
-            if (end - start < Duration.ofDays(7).getSeconds() && end - start > Duration.ofDays(1).getSeconds()) {
-                step = "1h";
-            } else if (end - start >= Duration.ofDays(7).getSeconds()) {
-                step = "4h";
-            }
-
-            HttpEntity<Void> httpEntity = new HttpEntity<>(headers);
-            URI uri = UriComponentsBuilder.fromUriString(greptimeProperties.httpEndpoint() + QUERY_RANGE_PATH)
-                    .queryParam(URLEncoder.encode("query", StandardCharsets.UTF_8), URLEncoder.encode("{" + timeSeriesSelector + "}", StandardCharsets.UTF_8))
-                    .queryParam("start", start)
-                    .queryParam("end", end)
-                    .queryParam("step", step)
-                    .queryParam("db", greptimeProperties.database())
-                    .build(true).toUri();
-
-            ResponseEntity<PromQlQueryContent> responseEntity = restTemplate.exchange(uri,
-                    HttpMethod.GET, httpEntity, PromQlQueryContent.class);
-            if (responseEntity.getStatusCode().is2xxSuccessful()) {
+            if (responseEntity != null && responseEntity.getStatusCode().is2xxSuccessful()) {
                 log.debug("query metrics data from greptime success. {}", uri);
                 if (responseEntity.getBody() != null && responseEntity.getBody().getData() != null
                         && responseEntity.getBody().getData().getResult() != null) {
@@ -279,21 +373,101 @@
                 log.error("query metrics data from greptime failed. {}", responseEntity);
             }
         } catch (Exception e) {
-            log.error(e.getMessage(), e);
+            log.error("query metrics data from greptime error. {}", e.getMessage(), e);
         }
         return instanceValuesMap;
     }
 
-    private String getTableName(String metrics) {
-        return metrics;
+    /**
+     * Get HTTP instance
+     *
+     * @return HTTP instance
+     */
+    private HttpEntity<Void> getHttpEntity() {
+        HttpHeaders headers = new HttpHeaders();
+        headers.setContentType(MediaType.APPLICATION_JSON);
+        headers.setAccept(List.of(MediaType.APPLICATION_JSON));
+        if (StringUtils.hasText(greptimeProperties.username())
+                && StringUtils.hasText(greptimeProperties.password())) {
+            String authStr = greptimeProperties.username() + ":" + greptimeProperties.password();
+            String encodedAuth = Base64Util.encode(authStr);
+            headers.add(HttpHeaders.AUTHORIZATION, BASIC + " " + encodedAuth);
+        }
+        return new HttpEntity<>(headers);
     }
 
-    @Override
-    public Map<String, List<Value>> getHistoryIntervalMetricData(Long monitorId, String app, String metrics,
-                                                                 String metric, String label, String history) {
-        return getHistoryMetricData(monitorId, app, metrics, metric, label, history);
+    /**
+     * Get Request URI
+     *
+     * @param start         start time
+     * @param end           end time
+     * @param step          interval
+     * @param queryFunction request parameters
+     * @return URI
+     */
+    private URI getUri(long start, long end, String step, Function<UriComponents, String> queryFunction) {
+        UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromUriString(greptimeProperties.httpEndpoint() + QUERY_RANGE_PATH)
+                .queryParam("start", start)
+                .queryParam("end", end)
+                .queryParam("step", step)
+                .queryParam("db", greptimeProperties.database());
+        UriComponents cloneUriComponents = uriComponentsBuilder.cloneBuilder().build(true);
+        String queryValue = queryFunction.apply(cloneUriComponents);
+        if (!StringUtils.hasText(queryValue)) {
+            return null;
+        }
+        UriComponents uriComponents = uriComponentsBuilder
+                .queryParam(
+                        URLEncoder.encode("query", StandardCharsets.UTF_8),
+                        URLEncoder.encode(queryValue, StandardCharsets.UTF_8)
+                ).build(true);
+        return uriComponents.toUri();
     }
-    
+
+    /**
+     * Request greptime and assign a value
+     *
+     * @param uri               request URI
+     * @param instanceValuesMap metrics data
+     * @param valueConsumer     consumer used for assigning values
+     */
+    private void requestIntervalMetricAndPutValue(URI uri, Map<String, List<Value>> instanceValuesMap, BiConsumer<Value, String> valueConsumer) {
+        if (uri == null) {
+            return;
+        }
+        HttpEntity<Void> httpEntity = getHttpEntity();
+        ResponseEntity<PromQlQueryContent> responseEntity = restTemplate.exchange(uri,
+                HttpMethod.GET, httpEntity, PromQlQueryContent.class);
+        if (!responseEntity.getStatusCode().is2xxSuccessful()) {
+            log.error("query interval metrics data from greptime failed. {}", responseEntity);
+            return;
+        }
+        log.debug("query interval metrics data from greptime success. {}", uri);
+        PromQlQueryContent body = responseEntity.getBody();
+        if (body == null || body.getData() == null || body.getData().getResult() == null) {
+            return;
+        }
+        List<PromQlQueryContent.ContentData.Content> contents = body.getData().getResult();
+        for (PromQlQueryContent.ContentData.Content content : contents) {
+            Map<String, String> labels = content.getMetric();
+            labels.remove(LABEL_KEY_NAME);
+            labels.remove(LABEL_KEY_INSTANCE);
+            String labelStr = JsonUtil.toJson(labels);
+            if (content.getValues() == null || content.getValues().isEmpty()) {
+                continue;
+            }
+            List<Value> valueList = instanceValuesMap.computeIfAbsent(labelStr, k -> new LinkedList<>());
+            if (valueList.size() == content.getValues().size()) {
+                for (int timestampIndex = 0; timestampIndex < valueList.size(); timestampIndex++) {
+                    Value value = valueList.get(timestampIndex);
+                    Object[] valueArr = content.getValues().get(timestampIndex);
+                    String avgValue = new BigDecimal(String.valueOf(valueArr[1])).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString();
+                    valueConsumer.accept(value, avgValue);
+                }
+            }
+        }
+    }
+
     @Override
     public void destroy() {
         if (this.greptimeDb != null) {
@@ -359,14 +533,14 @@
     }
 
     @Override
-    public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long endTime, String traceId, 
-                                                        String spanId, Integer severityNumber, 
+    public List<LogEntry> queryLogsByMultipleConditions(Long startTime, Long endTime, String traceId,
+                                                        String spanId, Integer severityNumber,
                                                         String severityText) {
         try {
             StringBuilder sql = new StringBuilder("SELECT * FROM ").append(LOG_TABLE_NAME);
             buildWhereConditions(sql, startTime, endTime, traceId, spanId, severityNumber, severityText);
             sql.append(" ORDER BY time_unix_nano DESC");
-            
+
             List<Map<String, Object>> rows = greptimeSqlQueryExecutor.execute(sql.toString());
             return mapRowsToLogEntries(rows);
         } catch (Exception e) {
@@ -376,14 +550,14 @@
     }
 
     @Override
-    public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long startTime, Long endTime, String traceId, 
-                                                                      String spanId, Integer severityNumber, 
+    public List<LogEntry> queryLogsByMultipleConditionsWithPagination(Long startTime, Long endTime, String traceId,
+                                                                      String spanId, Integer severityNumber,
                                                                       String severityText, Integer offset, Integer limit) {
         try {
             StringBuilder sql = new StringBuilder("SELECT * FROM ").append(LOG_TABLE_NAME);
             buildWhereConditions(sql, startTime, endTime, traceId, spanId, severityNumber, severityText);
             sql.append(" ORDER BY time_unix_nano DESC");
-            
+
             // Add pagination
             if (limit != null && limit > 0) {
                 sql.append(" LIMIT ").append(limit);
@@ -391,7 +565,7 @@
                     sql.append(" OFFSET ").append(offset);
                 }
             }
-            
+
             List<Map<String, Object>> rows = greptimeSqlQueryExecutor.execute(sql.toString());
             return mapRowsToLogEntries(rows);
         } catch (Exception e) {
@@ -401,13 +575,13 @@
     }
 
     @Override
-    public long countLogsByMultipleConditions(Long startTime, Long endTime, String traceId, 
-                                             String spanId, Integer severityNumber, 
+    public long countLogsByMultipleConditions(Long startTime, Long endTime, String traceId,
+                                             String spanId, Integer severityNumber,
                                              String severityText) {
         try {
             StringBuilder sql = new StringBuilder("SELECT COUNT(*) as count FROM ").append(LOG_TABLE_NAME);
             buildWhereConditions(sql, startTime, endTime, traceId, spanId, severityNumber, severityText);
-            
+
             List<Map<String, Object>> rows = greptimeSqlQueryExecutor.execute(sql.toString());
             if (rows != null && !rows.isEmpty()) {
                 Object countObj = rows.get(0).get("count");
@@ -442,35 +616,35 @@
      * @param spanId span id
      * @param severityNumber severity number
      */
-    private void buildWhereConditions(StringBuilder sql, Long startTime, Long endTime, String traceId, 
+    private void buildWhereConditions(StringBuilder sql, Long startTime, Long endTime, String traceId,
                                      String spanId, Integer severityNumber, String severityText) {
         List<String> conditions = new ArrayList<>();
-        
+
         // Time range condition
         if (startTime != null && endTime != null) {
             conditions.add("time_unix_nano >= " + msToNs(startTime) + " AND time_unix_nano <= " + msToNs(endTime));
         }
-        
+
         // TraceId condition
         if (StringUtils.hasText(traceId)) {
             conditions.add("trace_id = '" + safeString(traceId) + "'");
         }
-        
+
         // SpanId condition
         if (StringUtils.hasText(spanId)) {
             conditions.add("span_id = '" + safeString(spanId) + "'");
         }
-        
+
         // Severity condition
         if (severityNumber != null) {
             conditions.add("severity_number = " + severityNumber);
         }
-        
+
         // SeverityText condition
         if (StringUtils.hasText(severityText)) {
             conditions.add("severity_text = '" + safeString(severityText) + "'");
         }
-        
+
         // Add WHERE clause if there are conditions
         if (!conditions.isEmpty()) {
             sql.append(" WHERE ").append(String.join(" AND ", conditions));
@@ -582,11 +756,11 @@
                     .map(String::valueOf)
                     .collect(Collectors.joining(", ")));
             sql.append(")");
-            
+
             greptimeSqlQueryExecutor.execute(sql.toString());
             log.info("[warehouse greptime-log] Batch delete executed successfully for {} logs", timeUnixNanos.size());
             return true;
-            
+
         } catch (Exception e) {
             log.error("[warehouse greptime-log] batchDeleteLogs error: {}", e.getMessage(), e);
             return false;