blob: 5d7bd48259034f24ca8aea749e32cb07822bcda3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
import com.google.gson.Gson;
import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Query;
import org.apache.skywalking.library.elasticsearch.requests.search.Search;
import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
import org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTask;
import org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter;
import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class EBPFProfilingTaskEsDAO extends EsDAO implements IEBPFProfilingTaskDAO {
private static final Gson GSON = new Gson();
private final int taskMaxSize;
public EBPFProfilingTaskEsDAO(ElasticSearchClient client, StorageModuleElasticsearchConfig config) {
super(client);
this.taskMaxSize = config.getProfileTaskQueryMaxSize();
}
@Override
public List<EBPFProfilingTask> queryTasksByServices(List<String> serviceIdList, long taskStartTime, long latestUpdateTime) throws IOException {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingTaskRecord.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
if (IndexController.LogicIndicesRegister.isPhysicalTable(EBPFProfilingTaskRecord.INDEX_NAME)) {
query.must(Query.term(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME, EBPFProfilingTaskRecord.INDEX_NAME));
}
if (CollectionUtils.isNotEmpty(serviceIdList)) {
query.must(Query.terms(EBPFProfilingTaskRecord.SERVICE_ID, serviceIdList));
}
if (taskStartTime > 0) {
query.must(Query.range(EBPFProfilingTaskRecord.START_TIME).gte(taskStartTime));
}
if (latestUpdateTime > 0) {
query.must(Query.range(EBPFProfilingTaskRecord.LAST_UPDATE_TIME).gt(latestUpdateTime));
}
final SearchBuilder search = Search.builder().query(query)
.sort(EBPFProfilingTaskRecord.CREATE_TIME, Sort.Order.DESC)
.size(taskMaxSize);
final SearchResponse response = getClient().search(index, search.build());
return response.getHits().getHits().stream().map(this::parseTask).collect(Collectors.toList());
}
@Override
public List<EBPFProfilingTask> queryTasksByTargets(String serviceId, String serviceInstanceId, List<EBPFProfilingTargetType> targetTypes, long taskStartTime, long latestUpdateTime) throws IOException {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingTaskRecord.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
if (StringUtil.isNotEmpty(serviceId)) {
query.must(Query.term(EBPFProfilingTaskRecord.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
query.must(Query.term(EBPFProfilingTaskRecord.INSTANCE_ID, serviceInstanceId));
}
if (CollectionUtils.isNotEmpty(targetTypes)) {
query.must(Query.terms(EBPFProfilingTaskRecord.TARGET_TYPE, targetTypes.stream()
.map(EBPFProfilingTargetType::value).collect(Collectors.toList())));
}
if (taskStartTime > 0) {
query.must(Query.range(EBPFProfilingTaskRecord.START_TIME).gte(taskStartTime));
}
if (latestUpdateTime > 0) {
query.must(Query.range(EBPFProfilingTaskRecord.LAST_UPDATE_TIME).gt(latestUpdateTime));
}
final SearchBuilder search = Search.builder().query(query)
.sort(EBPFProfilingTaskRecord.CREATE_TIME, Sort.Order.DESC)
.size(taskMaxSize);
final SearchResponse response = getClient().search(index, search.build());
return response.getHits().getHits().stream().map(this::parseTask).collect(Collectors.toList());
}
@Override
public EBPFProfilingTask queryById(String id) throws IOException {
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingTaskRecord.INDEX_NAME);
final BoolQueryBuilder query = Query.bool().must(Query.term(EBPFProfilingTaskRecord.LOGICAL_ID, id));
final SearchBuilder search = Search.builder().query(query).size(taskMaxSize);
final SearchResponse response = getClient().search(index, search.build());
final List<EBPFProfilingTask> tasks = response.getHits().getHits().stream().map(this::parseTask).collect(Collectors.toList());
if (CollectionUtils.isEmpty(tasks)) {
return null;
}
EBPFProfilingTask result = tasks.get(0);
for (int i = 1; i < tasks.size(); i++) {
result = result.combine(tasks.get(i));
}
return result;
}
private EBPFProfilingTask parseTask(final SearchHit hit) {
final Map<String, Object> sourceAsMap = hit.getSource();
final EBPFProfilingTaskRecord.Builder builder = new EBPFProfilingTaskRecord.Builder();
final EBPFProfilingTaskRecord record = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap));
final EBPFProfilingTask task = new EBPFProfilingTask();
task.setTaskId(record.getLogicalId());
task.setServiceId(record.getServiceId());
task.setServiceName(IDManager.ServiceID.analysisId(record.getServiceId()).getName());
if (StringUtil.isNotEmpty(record.getProcessLabelsJson())) {
task.setProcessLabels(GSON.<List<String>>fromJson(record.getProcessLabelsJson(), ArrayList.class));
} else {
task.setProcessLabels(Collections.emptyList());
}
if (StringUtil.isNotEmpty(record.getInstanceId())) {
task.setServiceInstanceId(record.getInstanceId());
task.setServiceInstanceName(IDManager.ServiceInstanceID.analysisId(record.getInstanceId()).getName());
}
task.setTaskStartTime(record.getStartTime());
task.setTriggerType(EBPFProfilingTriggerType.valueOf(record.getTriggerType()));
task.setFixedTriggerDuration(record.getFixedTriggerDuration());
task.setTargetType(EBPFProfilingTargetType.valueOf(record.getTargetType()));
task.setCreateTime(record.getCreateTime());
task.setLastUpdateTime(record.getLastUpdateTime());
return task;
}
}