| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| * |
| */ |
| |
| package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query; |
| |
| import com.google.common.base.Strings; |
| import com.google.gson.Gson; |
| import com.google.gson.JsonElement; |
| import com.google.gson.JsonObject; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder; |
| import org.apache.skywalking.library.elasticsearch.requests.search.Query; |
| import org.apache.skywalking.library.elasticsearch.requests.search.RangeQueryBuilder; |
| import org.apache.skywalking.library.elasticsearch.requests.search.Search; |
| import org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder; |
| import org.apache.skywalking.library.elasticsearch.requests.search.SearchParams; |
| import org.apache.skywalking.library.elasticsearch.response.search.SearchHit; |
| import org.apache.skywalking.library.elasticsearch.response.search.SearchResponse; |
| import org.apache.skywalking.oap.server.core.analysis.IDManager; |
| import org.apache.skywalking.oap.server.core.analysis.Layer; |
| import org.apache.skywalking.oap.server.core.analysis.TimeBucket; |
| import org.apache.skywalking.oap.server.core.analysis.manual.endpoint.EndpointTraffic; |
| import org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic; |
| import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessDetectType; |
| import org.apache.skywalking.oap.server.core.analysis.manual.process.ProcessTraffic; |
| import org.apache.skywalking.oap.server.core.analysis.manual.service.ServiceTraffic; |
| import org.apache.skywalking.oap.server.core.query.enumeration.Language; |
| import org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus; |
| import org.apache.skywalking.oap.server.core.query.type.Attribute; |
| import org.apache.skywalking.oap.server.core.query.type.Endpoint; |
| import org.apache.skywalking.oap.server.core.query.type.Process; |
| import org.apache.skywalking.oap.server.core.query.type.Service; |
| import org.apache.skywalking.oap.server.core.query.type.ServiceInstance; |
| import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO; |
| import org.apache.skywalking.oap.server.core.storage.type.HashMapConverter; |
| import org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient; |
| import org.apache.skywalking.oap.server.library.util.StringUtil; |
| import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig; |
| import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO; |
| import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController; |
| import org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.MatchCNameBuilder; |
| import static org.apache.skywalking.oap.server.core.analysis.manual.instance.InstanceTraffic.PropertyUtil.LANGUAGE; |
| |
| public class MetadataQueryEsDAO extends EsDAO implements IMetadataQueryDAO { |
| private static final Gson GSON = new Gson(); |
| |
| private final int queryMaxSize; |
| private final int scrollingBatchSize; |
| |
| public MetadataQueryEsDAO( |
| ElasticSearchClient client, |
| StorageModuleElasticsearchConfig config) { |
| super(client); |
| this.queryMaxSize = config.getMetadataQueryMaxSize(); |
| this.scrollingBatchSize = config.getScrollingBatchSize(); |
| } |
| |
| @Override |
| public List<Service> listServices(final String layer, final String group) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME); |
| |
| final int batchSize = Math.min(queryMaxSize, scrollingBatchSize); |
| final BoolQueryBuilder query = |
| Query.bool(); |
| final SearchBuilder search = Search.builder().query(query).size(batchSize); |
| if (StringUtil.isNotEmpty(layer)) { |
| query.must(Query.term(ServiceTraffic.LAYER, Layer.valueOf(layer).value())); |
| } |
| if (StringUtil.isNotEmpty(group)) { |
| query.must(Query.term(ServiceTraffic.GROUP, group)); |
| } |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(ServiceTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ServiceTraffic.INDEX_NAME)); |
| } |
| final SearchParams params = new SearchParams().scroll(SCROLL_CONTEXT_RETENTION); |
| final List<Service> services = new ArrayList<>(); |
| |
| SearchResponse results = getClient().search(index, search.build(), params); |
| Set<String> scrollIds = new HashSet<>(); |
| try { |
| while (true) { |
| String scrollId = results.getScrollId(); |
| scrollIds.add(scrollId); |
| if (results.getHits().getTotal() == 0) { |
| break; |
| } |
| final List<Service> batch = buildServices(results); |
| services.addAll(batch); |
| // The last iterate, there is no more data |
| if (batch.size() < batchSize) { |
| break; |
| } |
| // We've got enough data |
| if (services.size() >= queryMaxSize) { |
| break; |
| } |
| results = getClient().scroll(SCROLL_CONTEXT_RETENTION, scrollId); |
| } |
| } finally { |
| scrollIds.forEach(getClient()::deleteScrollContextQuietly); |
| } |
| return services; |
| } |
| |
| @Override |
| public List<Service> getServices(final String serviceId) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(ServiceTraffic.INDEX_NAME); |
| final BoolQueryBuilder query = |
| Query.bool() |
| .must(Query.term(ServiceTraffic.SERVICE_ID, serviceId)); |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(ServiceTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ServiceTraffic.INDEX_NAME)); |
| } |
| final SearchBuilder search = Search.builder().query(query).size(queryMaxSize); |
| |
| final SearchResponse response = getClient().search(index, search.build()); |
| return buildServices(response); |
| } |
| |
| @Override |
| public List<ServiceInstance> listInstances(long startTimestamp, long endTimestamp, |
| String serviceId) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME); |
| |
| final long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(startTimestamp); |
| final BoolQueryBuilder query = |
| Query.bool() |
| .must(Query.range(InstanceTraffic.LAST_PING_TIME_BUCKET).gte(minuteTimeBucket)) |
| .must(Query.term(InstanceTraffic.SERVICE_ID, serviceId)); |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(InstanceTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, InstanceTraffic.INDEX_NAME)); |
| } |
| final int batchSize = Math.min(queryMaxSize, scrollingBatchSize); |
| final SearchBuilder search = Search.builder().query(query).size(batchSize); |
| |
| final List<ServiceInstance> instances = new ArrayList<>(); |
| SearchResponse response = getClient().search(index, search.build()); |
| while (response.getHits().getTotal() > 0) { |
| final List<ServiceInstance> batch = buildInstances(response); |
| instances.addAll(batch); |
| if (batch.size() < batchSize) { |
| break; |
| } |
| if (batch.size() >= queryMaxSize) { |
| break; |
| } |
| response = getClient().scroll(SCROLL_CONTEXT_RETENTION, response.getScrollId()); |
| } |
| return instances; |
| } |
| |
| @Override |
| public ServiceInstance getInstance(final String instanceId) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME); |
| String id = instanceId; |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(InstanceTraffic.INDEX_NAME)) { |
| id = IndexController.INSTANCE.generateDocId(InstanceTraffic.INDEX_NAME, instanceId); |
| } |
| final BoolQueryBuilder query = |
| Query.bool() |
| .must(Query.term("_id", id)); |
| final SearchBuilder search = Search.builder().query(query).size(1); |
| |
| final SearchResponse response = getClient().search(index, search.build()); |
| final List<ServiceInstance> instances = buildInstances(response); |
| return instances.size() > 0 ? instances.get(0) : null; |
| } |
| |
| @Override |
| public List<Endpoint> findEndpoint(String keyword, String serviceId, int limit) |
| throws IOException { |
| final String index = IndexController.LogicIndicesRegister.getPhysicalTableName( |
| EndpointTraffic.INDEX_NAME); |
| |
| final BoolQueryBuilder query = |
| Query.bool() |
| .must(Query.term(EndpointTraffic.SERVICE_ID, serviceId)); |
| |
| if (!Strings.isNullOrEmpty(keyword)) { |
| String matchCName = MatchCNameBuilder.INSTANCE.build(EndpointTraffic.NAME); |
| query.must(Query.match(matchCName, keyword)); |
| } |
| |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(EndpointTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, EndpointTraffic.INDEX_NAME)); |
| } |
| |
| final SearchBuilder search = Search.builder().query(query).size(limit); |
| |
| final SearchResponse response = getClient().search(index, search.build()); |
| |
| List<Endpoint> endpoints = new ArrayList<>(); |
| for (SearchHit searchHit : response.getHits()) { |
| Map<String, Object> sourceAsMap = searchHit.getSource(); |
| |
| final EndpointTraffic endpointTraffic = |
| new EndpointTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap)); |
| |
| Endpoint endpoint = new Endpoint(); |
| endpoint.setId(endpointTraffic.id()); |
| endpoint.setName((String) sourceAsMap.get(EndpointTraffic.NAME)); |
| endpoints.add(endpoint); |
| } |
| |
| return endpoints; |
| } |
| |
| @Override |
| public List<Process> listProcesses(String serviceId, ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME); |
| |
| final BoolQueryBuilder query = Query.bool(); |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(ProcessTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ProcessTraffic.INDEX_NAME)); |
| } |
| final SearchBuilder search = Search.builder().query(query).size(queryMaxSize); |
| appendProcessWhereQuery(query, serviceId, null, null, supportStatus, lastPingStartTimeBucket, lastPingEndTimeBucket); |
| final SearchResponse results = getClient().search(index, search.build()); |
| |
| return buildProcesses(results); |
| } |
| |
| @Override |
| public List<Process> listProcesses(String serviceInstanceId, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME); |
| |
| final BoolQueryBuilder query = Query.bool(); |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(ProcessTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ProcessTraffic.INDEX_NAME)); |
| } |
| final SearchBuilder search = Search.builder().query(query).size(queryMaxSize); |
| appendProcessWhereQuery(query, null, serviceInstanceId, null, null, lastPingStartTimeBucket, lastPingEndTimeBucket); |
| final SearchResponse results = getClient().search(index, search.build()); |
| |
| return buildProcesses(results); |
| } |
| |
| @Override |
| public List<Process> listProcesses(String agentId) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME); |
| |
| final BoolQueryBuilder query = Query.bool(); |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(ProcessTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ProcessTraffic.INDEX_NAME)); |
| } |
| final SearchBuilder search = Search.builder().query(query).size(queryMaxSize); |
| appendProcessWhereQuery(query, null, null, agentId, null, 0, 0); |
| final SearchResponse results = getClient().search(index, search.build()); |
| |
| return buildProcesses(results); |
| } |
| |
| @Override |
| public long getProcessCount(String serviceId, ProfilingSupportStatus profilingSupportStatus, long lastPingStartTimeBucket, long lastPingEndTimeBucket) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME); |
| |
| final BoolQueryBuilder query = Query.bool(); |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(ProcessTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ProcessTraffic.INDEX_NAME)); |
| } |
| final SearchBuilder search = Search.builder().query(query).size(0); |
| appendProcessWhereQuery(query, serviceId, null, null, profilingSupportStatus, |
| lastPingStartTimeBucket, lastPingEndTimeBucket); |
| final SearchResponse results = getClient().search(index, search.build()); |
| |
| return results.getHits().getTotal(); |
| } |
| |
| @Override |
| public long getProcessCount(String instanceId) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME); |
| |
| final BoolQueryBuilder query = Query.bool(); |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(ProcessTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ProcessTraffic.INDEX_NAME)); |
| } |
| final SearchBuilder search = Search.builder().query(query).size(0); |
| appendProcessWhereQuery(query, null, instanceId, null, null, 0, 0); |
| final SearchResponse results = getClient().search(index, search.build()); |
| |
| return results.getHits().getTotal(); |
| } |
| |
| private void appendProcessWhereQuery(BoolQueryBuilder query, String serviceId, String instanceId, String agentId, |
| final ProfilingSupportStatus profilingSupportStatus, |
| final long lastPingStartTimeBucket, final long lastPingEndTimeBucket) { |
| if (StringUtil.isNotEmpty(serviceId)) { |
| query.must(Query.term(ProcessTraffic.SERVICE_ID, serviceId)); |
| } |
| if (StringUtil.isNotEmpty(instanceId)) { |
| query.must(Query.term(ProcessTraffic.INSTANCE_ID, instanceId)); |
| } |
| if (StringUtil.isNotEmpty(agentId)) { |
| query.must(Query.term(ProcessTraffic.AGENT_ID, agentId)); |
| } |
| if (profilingSupportStatus != null) { |
| query.must(Query.term(ProcessTraffic.PROFILING_SUPPORT_STATUS, profilingSupportStatus.value())); |
| } |
| final RangeQueryBuilder rangeQuery = Query.range(ProcessTraffic.LAST_PING_TIME_BUCKET); |
| if (lastPingStartTimeBucket > 0) { |
| rangeQuery.gte(lastPingStartTimeBucket); |
| } |
| if (lastPingEndTimeBucket > 0) { |
| rangeQuery.lte(lastPingEndTimeBucket); |
| } |
| if (lastPingStartTimeBucket > 0 || lastPingEndTimeBucket > 0) { |
| query.must(rangeQuery); |
| } |
| query.mustNot(Query.term(ProcessTraffic.DETECT_TYPE, ProcessDetectType.VIRTUAL.value())); |
| } |
| |
| @Override |
| public Process getProcess(String processId) throws IOException { |
| final String index = |
| IndexController.LogicIndicesRegister.getPhysicalTableName(ProcessTraffic.INDEX_NAME); |
| final BoolQueryBuilder query = Query.bool() |
| .must(Query.term("_id", processId)); |
| if (IndexController.LogicIndicesRegister.isPhysicalTable(ProcessTraffic.INDEX_NAME)) { |
| query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME, ProcessTraffic.INDEX_NAME)); |
| } |
| final SearchBuilder search = Search.builder().query(query).size(queryMaxSize); |
| |
| final SearchResponse response = getClient().search(index, search.build()); |
| final List<Process> processes = buildProcesses(response); |
| return processes.isEmpty() ? null : processes.get(0); |
| } |
| |
| private List<Service> buildServices(SearchResponse response) { |
| List<Service> services = new ArrayList<>(); |
| for (SearchHit hit : response.getHits()) { |
| final Map<String, Object> sourceAsMap = hit.getSource(); |
| final ServiceTraffic.Builder builder = new ServiceTraffic.Builder(); |
| final ServiceTraffic serviceTraffic = builder.storage2Entity(new HashMapConverter.ToEntity(sourceAsMap)); |
| String serviceName = serviceTraffic.getName(); |
| Service service = new Service(); |
| service.setId(serviceTraffic.getServiceId()); |
| service.setName(serviceName); |
| service.setShortName(serviceTraffic.getShortName()); |
| service.setGroup(serviceTraffic.getGroup()); |
| service.getLayers().add(serviceTraffic.getLayer().name()); |
| services.add(service); |
| } |
| return services; |
| } |
| |
| private List<ServiceInstance> buildInstances(SearchResponse response) { |
| List<ServiceInstance> serviceInstances = new ArrayList<>(); |
| for (SearchHit searchHit : response.getHits()) { |
| Map<String, Object> sourceAsMap = searchHit.getSource(); |
| |
| final InstanceTraffic instanceTraffic = |
| new InstanceTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap)); |
| |
| ServiceInstance serviceInstance = new ServiceInstance(); |
| serviceInstance.setId(instanceTraffic.id()); |
| serviceInstance.setName(instanceTraffic.getName()); |
| serviceInstance.setInstanceUUID(serviceInstance.getId()); |
| |
| JsonObject properties = instanceTraffic.getProperties(); |
| if (properties != null) { |
| for (Map.Entry<String, JsonElement> property : properties.entrySet()) { |
| String key = property.getKey(); |
| String value = property.getValue().getAsString(); |
| if (key.equals(LANGUAGE)) { |
| serviceInstance.setLanguage(Language.value(value)); |
| } else { |
| serviceInstance.getAttributes().add(new Attribute(key, value)); |
| } |
| } |
| } else { |
| serviceInstance.setLanguage(Language.UNKNOWN); |
| } |
| serviceInstances.add(serviceInstance); |
| } |
| return serviceInstances; |
| } |
| |
| private List<Process> buildProcesses(SearchResponse response) { |
| List<Process> processes = new ArrayList<>(); |
| for (SearchHit searchHit : response.getHits()) { |
| Map<String, Object> sourceAsMap = searchHit.getSource(); |
| |
| final ProcessTraffic processTraffic = |
| new ProcessTraffic.Builder().storage2Entity(new HashMapConverter.ToEntity(sourceAsMap)); |
| |
| Process process = new Process(); |
| process.setId(processTraffic.id()); |
| process.setName(processTraffic.getName()); |
| final String serviceId = processTraffic.getServiceId(); |
| process.setServiceId(serviceId); |
| process.setServiceName(IDManager.ServiceID.analysisId(serviceId).getName()); |
| final String instanceId = processTraffic.getInstanceId(); |
| process.setInstanceId(instanceId); |
| process.setInstanceName(IDManager.ServiceInstanceID.analysisId(instanceId).getName()); |
| process.setAgentId(processTraffic.getAgentId()); |
| process.setDetectType(ProcessDetectType.valueOf(processTraffic.getDetectType()).name()); |
| process.setProfilingSupportStatus(ProfilingSupportStatus.valueOf(processTraffic.getProfilingSupportStatus()).name()); |
| |
| JsonObject properties = processTraffic.getProperties(); |
| if (properties != null) { |
| for (Map.Entry<String, JsonElement> property : properties.entrySet()) { |
| String key = property.getKey(); |
| String value = property.getValue().getAsString(); |
| process.getAttributes().add(new Attribute(key, value)); |
| } |
| } |
| final String labelsJson = processTraffic.getLabelsJson(); |
| if (StringUtils.isNotEmpty(labelsJson)) { |
| final List<String> labels = GSON.<List<String>>fromJson(labelsJson, ArrayList.class); |
| process.getLabels().addAll(labels); |
| } |
| processes.add(process); |
| } |
| return processes; |
| } |
| } |