blob: 6d194ee2eec86445bceab630e8d1b3c33c0e7ac0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.Aggregation;
import org.apache.skywalking.library.elasticsearch.requests.search.aggregation.TermsAggregationBuilder;
import org.apache.skywalking.library.elasticsearch.response.Document;
import org.apache.skywalking.library.elasticsearch.response.Documents;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHits;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.metrics.DataTable;
import org.apache.skywalking.oap.server.core.analysis.metrics.HistogramMetrics;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.query.PointOfTime;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.input.MetricsCondition;
import org.apache.skywalking.oap.server.core.query.sql.Function;
import org.apache.skywalking.oap.server.core.query.type.HeatMap;
import org.apache.skywalking.oap.server.core.query.type.IntValues;
import org.apache.skywalking.oap.server.core.query.type.KVInt;
import org.apache.skywalking.oap.server.core.query.type.MetricsValues;
import org.apache.skywalking.oap.server.core.storage.annotation.ValueColumnMetadata;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeRangeIndexNameGenerator;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.TimeSeriesUtils;
public class MetricsQueryEsDAO extends EsDAO implements IMetricsQueryDAO {
public MetricsQueryEsDAO(ElasticSearchClient client) {
super(client);
}
@Override
public long readMetricsValue(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) {
final SearchBuilder sourceBuilder = buildQuery(condition, duration);
int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
Function function = ValueColumnMetadata.INSTANCE.getValueFunction(condition.getName());
if (function == Function.Latest) {
return readMetricsValues(condition, valueColumnName, duration)
.getValues().latestValue(defaultValue);
}
final TermsAggregationBuilder entityIdAggregation =
Aggregation.terms(Metrics.ENTITY_ID)
.field(Metrics.ENTITY_ID)
.executionHint(TermsAggregationBuilder.ExecutionHint.MAP)
.collectMode(TermsAggregationBuilder.CollectMode.BREADTH_FIRST)
.size(1);
functionAggregation(function, entityIdAggregation, valueColumnName);
sourceBuilder.aggregation(entityIdAggregation);
final SearchResponse response = getClient().search(new TimeRangeIndexNameGenerator(
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName()),
duration.getStartTimeBucketInSec(),
duration.getEndTimeBucketInSec()), sourceBuilder.build());
final Map<String, Object> idTerms =
(Map<String, Object>) response.getAggregations().get(Metrics.ENTITY_ID);
final List<Map<String, Object>> buckets =
(List<Map<String, Object>>) idTerms.get("buckets");
for (Map<String, Object> idBucket : buckets) {
final Map<String, Object> agg = (Map<String, Object>) idBucket.get(valueColumnName);
return ((Number) agg.get("value")).longValue();
}
return defaultValue;
}
@Override
public MetricsValues readMetricsValues(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) {
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
Map<String, List<String>> indexIdsGroup = new HashMap<>();
final List<String> ids = pointOfTimes.stream().map(pointOfTime -> {
String id = pointOfTime.id(condition.getEntity().buildId());
if (IndexController.LogicIndicesRegister.isPhysicalTable(condition.getName())) {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
String indexName = TimeSeriesUtils.queryIndexName(
tableName, pointOfTime.getPoint(), duration.getStep(), false, false);
indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id);
return id;
}).collect(Collectors.toList());
MetricsValues metricsValues = new MetricsValues();
Optional<Documents> response = getClient().ids(indexIdsGroup);
if (response.isPresent()) {
Map<String, Map<String, Object>> idMap = toMap(response.get());
// Label is null, because in readMetricsValues, no label parameter.
IntValues intValues = metricsValues.getValues();
for (String id : ids) {
KVInt kvInt = new KVInt();
kvInt.setId(id);
kvInt.setValue(0);
if (idMap.containsKey(id)) {
Map<String, Object> source = idMap.get(id);
kvInt.setValue(((Number) source.getOrDefault(valueColumnName, 0)).longValue());
} else {
kvInt.setValue(ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName()));
}
intValues.addKVInt(kvInt);
}
}
metricsValues.setValues(
Util.sortValues(
metricsValues.getValues(), ids, ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName()))
);
return metricsValues;
}
@Override
public List<MetricsValues> readLabeledMetricsValues(final MetricsCondition condition,
final String valueColumnName,
final List<String> labels,
final Duration duration) {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
Map<String, List<String>> indexIdsGroup = new HashMap<>();
boolean aggregationMode = !tableName.equals(condition.getName());
List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> {
String id = pointOfTime.id(condition.getEntity().buildId());
if (aggregationMode) {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
ids.add(id);
String indexName = TimeSeriesUtils.queryIndexName(
tableName, pointOfTime.getPoint(), duration.getStep(), false, false);
indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id);
});
Optional<Documents> response = getClient().ids(indexIdsGroup);
Map<String, DataTable> idMap = new HashMap<>();
if (response.isPresent()) {
for (final Document document : response.get()) {
idMap.put(
document.getId(),
new DataTable((String) document.getSource().getOrDefault(valueColumnName, ""))
);
}
}
return Util.composeLabelValue(condition, labels, ids, idMap);
}
@Override
public HeatMap readHeatMap(final MetricsCondition condition,
final String valueColumnName,
final Duration duration) {
final List<PointOfTime> pointOfTimes = duration.assembleDurationPoints();
String tableName =
IndexController.LogicIndicesRegister.getPhysicalTableName(condition.getName());
Map<String, List<String>> indexIdsGroup = new HashMap<>();
boolean aggregationMode = !tableName.equals(condition.getName());
List<String> ids = new ArrayList<>(pointOfTimes.size());
pointOfTimes.forEach(pointOfTime -> {
String id = pointOfTime.id(condition.getEntity().buildId());
if (aggregationMode) {
id = IndexController.INSTANCE.generateDocId(condition.getName(), id);
}
ids.add(id);
String indexName = TimeSeriesUtils.queryIndexName(
tableName, pointOfTime.getPoint(), duration.getStep(), false, false);
indexIdsGroup.computeIfAbsent(indexName, v -> new ArrayList<>()).add(id);
});
HeatMap heatMap = new HeatMap();
Optional<Documents> response = getClient().ids(indexIdsGroup);
if (!response.isPresent()) {
return heatMap;
}
Map<String, Map<String, Object>> idMap = toMap(response.get());
final int defaultValue = ValueColumnMetadata.INSTANCE.getDefaultValue(condition.getName());
for (String id : ids) {
Map<String, Object> source = idMap.get(id);
if (source != null) {
String value = (String) source.get(HistogramMetrics.DATASET);
heatMap.buildColumn(id, value, defaultValue);
}
}
heatMap.fixMissingColumns(ids, defaultValue);
return heatMap;
}
protected void functionAggregation(Function function,
TermsAggregationBuilder parentAggBuilder,
String valueCName) {
switch (function) {
case Avg:
parentAggBuilder.subAggregation(Aggregation.avg(valueCName).field(valueCName));
break;
case Sum:
parentAggBuilder.subAggregation(Aggregation.sum(valueCName).field(valueCName));
break;
default:
parentAggBuilder.subAggregation(Aggregation.avg(valueCName).field(valueCName));
break;
}
}
protected final SearchBuilder buildQuery(MetricsCondition condition, Duration duration) {
final SearchBuilder sourceBuilder = Search.builder();
final RangeQueryBuilder rangeQueryBuilder =
Query.range(Metrics.TIME_BUCKET)
.gte(duration.getStartTimeBucket())
.lte(duration.getEndTimeBucket());
final String entityId = condition.getEntity().buildId();
if (entityId == null &&
IndexController.LogicIndicesRegister.isPhysicalTable(condition.getName())) {
sourceBuilder.query(
Query.bool()
.must(rangeQueryBuilder)
.must(Query.term(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
condition.getName()
))
);
} else if (entityId == null) {
sourceBuilder.query(rangeQueryBuilder);
} else if (IndexController.LogicIndicesRegister.isPhysicalTable(condition.getName())) {
sourceBuilder.query(
Query.bool()
.must(rangeQueryBuilder)
.must(Query.terms(Metrics.ENTITY_ID, entityId))
.must(Query.term(
IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
condition.getName()
))
);
} else {
sourceBuilder.query(
Query.bool()
.must(rangeQueryBuilder)
.must(Query.terms(Metrics.ENTITY_ID, entityId))
);
}
sourceBuilder.size(0);
return sourceBuilder;
}
private Map<String, Map<String, Object>> toMap(SearchHits hits) {
Map<String, Map<String, Object>> result = new HashMap<>();
for (final SearchHit hit : hits) {
result.put(hit.getId(), hit.getSource());
}
return result;
}
private Map<String, Map<String, Object>> toMap(Documents documents) {
Map<String, Map<String, Object>> result = new HashMap<>();
for (final Document document : documents) {
result.put(document.getId(), document.getSource());
}
return result;
}
}