blob: 2d095bed215152c7f8ec6687e746725c53050535 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.atlas.discovery;
import org.apache.atlas.AtlasException;
import org.apache.atlas.model.discovery.AtlasAggregationEntry;
import org.apache.atlas.model.discovery.SearchParameters;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AggregationContext;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
import org.apache.atlas.type.AtlasEntityType;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.atlas.type.AtlasTypeRegistry;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
public class SearchAggregatorImpl implements SearchAggregator {
private static final Logger LOG = LoggerFactory.getLogger(SearchAggregatorImpl.class);
private final SearchContext searchContext;
public SearchAggregatorImpl(SearchContext searchContext) {
this.searchContext = searchContext;
}
public Map<String, List<AtlasAggregationEntry>> getAggregatedMetrics(Set<String> aggregationFields,
Set<AtlasAttribute> aggregationAttributes) {
SearchParameters searchParameters = searchContext.getSearchParameters();
AtlasGraph graph = searchContext.getGraph();
AtlasTypeRegistry typeRegistry = searchContext.getTypeRegistry();
String queryString = searchParameters.getQuery();
List<PostProcessor> postProcessors = new ArrayList<>();
postProcessors.add(new ServiceTypeAggregator(searchContext.getTypeRegistry()));
try {
AtlasGraphIndexClient graphIndexClient = graph.getGraphIndexClient();
Set<AtlasEntityType> searchForEntityType = searchContext.getEntityTypes();
Map<String, String> indexFieldNameCache = new HashMap<>();
for (String fieldName: aggregationFields) {
String indexFieldName = getIndexFieldNameForCommonFieldName(typeRegistry, fieldName);
indexFieldNameCache.put(fieldName, indexFieldName);
}
for (AtlasAttribute attribute: aggregationAttributes) {
String indexFieldName = attribute.getIndexFieldName();
if (indexFieldName == null) {
//there is no index field name.
indexFieldName = attribute.getQualifiedName();
}
indexFieldNameCache.put(attribute.getQualifiedName(), indexFieldName);
}
AggregationContext aggregatorContext = new AggregationContext(queryString,
searchParameters.getEntityFilters(),
searchForEntityType,
aggregationFields,
aggregationAttributes,
indexFieldNameCache,
searchParameters.getExcludeDeletedEntities(),
searchParameters.getIncludeSubTypes());
Map<String, List<AtlasAggregationEntry>> aggregatedMetrics = graphIndexClient.getAggregatedMetrics(aggregatorContext);
for (String aggregationMetricName: aggregatedMetrics.keySet()) {
for (PostProcessor postProcessor: postProcessors) {
if (postProcessor.needsProcessing(aggregationMetricName)) {
postProcessor.prepareForMetric(aggregationMetricName);
for (AtlasAggregationEntry aggregationEntry: aggregatedMetrics.get(aggregationMetricName)) {
postProcessor.process(aggregationEntry);
}
postProcessor.handleMetricCompletion(aggregationMetricName);
}
}
}
for (PostProcessor postProcessor: postProcessors) {
postProcessor.handleCompletion(aggregatedMetrics);
}
return aggregatedMetrics;
} catch (AtlasException e) {
LOG.error("Error encountered in post processing stage of aggrgation metrics collection. Empty metrics will be returned.", e);
return Collections.EMPTY_MAP;
}
}
private String getIndexFieldNameForCommonFieldName(AtlasTypeRegistry typeRegistry, String fieldName) {
String indexFieldName = typeRegistry.getIndexFieldName(fieldName);
if(indexFieldName != null) {
return indexFieldName;
}
if(LOG.isDebugEnabled()) {
LOG.debug("Could not find index field name from type registry for attribute {}. Will use use the field name as is.", fieldName);
}
return fieldName;
}
static interface PostProcessor {
boolean needsProcessing(String metricName);
void prepareForMetric(String metricName);
void process(AtlasAggregationEntry aggregationEntry);
void handleMetricCompletion(String metricName);
void handleCompletion(Map<String, List<AtlasAggregationEntry>> aggregatedMetrics);
}
static class ServiceTypeAggregator implements PostProcessor {
private static final String SERVICE_TYPE = "ServiceType";
private final AtlasTypeRegistry typeRegistry;
private final List<AtlasAggregationEntry> entries = new ArrayList<>();;
private final Map<String, AtlasAggregationEntry> entityType2MetricsMap = new HashMap<>();
public ServiceTypeAggregator(AtlasTypeRegistry typeRegistry) {
this.typeRegistry = typeRegistry;
}
@Override
public boolean needsProcessing(String metricName) {
return Constants.ENTITY_TYPE_PROPERTY_KEY.equals(metricName);
}
@Override
public void prepareForMetric(String metricName) {
Map<String, AtlasAggregationEntry> serviceName2MetricsMap = new HashMap<>();
//prepare the service map to aggregations
for(String serviceName: typeRegistry.getAllServiceTypes()) {
AtlasAggregationEntry serviceMetrics = new AtlasAggregationEntry(serviceName, 0);
serviceName2MetricsMap.put(serviceName, serviceMetrics);
entries.add(serviceMetrics);
}
//prepare the map from entity type to aggregations
for(AtlasEntityType entityType: typeRegistry.getAllEntityTypes()) {
String serviceName = entityType.getServiceType();
entityType2MetricsMap.put(entityType.getTypeName(), serviceName2MetricsMap.get(serviceName));
}
}
@Override
public void process(AtlasAggregationEntry aggregationEntryForType) {
String entityType = aggregationEntryForType.getName();
AtlasAggregationEntry atlasAggregationEntryForService = entityType2MetricsMap.get(entityType);
//atlasAggregationEntryForService can be null--classifications for e.g.
if (atlasAggregationEntryForService != null) {
atlasAggregationEntryForService.setCount(atlasAggregationEntryForService.getCount() + aggregationEntryForType.getCount());
}
}
@Override
public void handleMetricCompletion(String metricName) {
//do nothing
}
@Override
public void handleCompletion(Map<String, List<AtlasAggregationEntry>> aggregatedMetrics) {
//remove all zero count entries.
for (int i = entries.size() - 1; i >= 0; i--) {
if (entries.get(i).getCount() == 0) {
entries.remove(i);
}
}
if (CollectionUtils.isNotEmpty(entries)) {
aggregatedMetrics.put(SERVICE_TYPE, entries);
}
}
}
}