| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.elasticsearch.dao; |
| |
| import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| |
| import org.apache.metron.elasticsearch.client.ElasticsearchClient; |
| import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; |
| import org.apache.metron.indexing.dao.AccessConfig; |
| import org.apache.metron.indexing.dao.search.FieldType; |
| import org.apache.metron.indexing.dao.search.Group; |
| import org.apache.metron.indexing.dao.search.GroupOrder; |
| import org.apache.metron.indexing.dao.search.GroupOrderType; |
| import org.apache.metron.indexing.dao.search.GroupRequest; |
| import org.apache.metron.indexing.dao.search.GroupResponse; |
| import org.apache.metron.indexing.dao.search.GroupResult; |
| import org.apache.metron.indexing.dao.search.InvalidSearchException; |
| import org.apache.metron.indexing.dao.search.SearchDao; |
| import org.apache.metron.indexing.dao.search.SearchRequest; |
| import org.apache.metron.indexing.dao.search.SearchResponse; |
| import org.apache.metron.indexing.dao.search.SearchResult; |
| import org.apache.metron.indexing.dao.search.SortField; |
| import org.apache.metron.indexing.dao.search.SortOrder; |
| import org.elasticsearch.index.mapper.LegacyIpFieldMapper; |
| import org.elasticsearch.index.query.QueryBuilder; |
| import org.elasticsearch.index.query.QueryStringQueryBuilder; |
| import org.elasticsearch.search.SearchHit; |
| import org.elasticsearch.search.aggregations.Aggregation; |
| import org.elasticsearch.search.aggregations.AggregationBuilders; |
| import org.elasticsearch.search.aggregations.Aggregations; |
| import org.elasticsearch.search.aggregations.bucket.terms.Terms; |
| import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; |
| import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; |
| import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; |
| import org.elasticsearch.search.aggregations.metrics.sum.Sum; |
| import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; |
| import org.elasticsearch.search.builder.SearchSourceBuilder; |
| import org.elasticsearch.search.sort.FieldSortBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ElasticsearchSearchDao implements SearchDao { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| /** |
| * The value required to ensure that Elasticsearch sorts missing values last. |
| */ |
| private static final String SORT_MISSING_LAST = "_last"; |
| |
| /** |
| * The value required to ensure that Elasticsearch sorts missing values last. |
| */ |
| private static final String SORT_MISSING_FIRST = "_first"; |
| |
| private transient ElasticsearchClient client; |
| private AccessConfig accessConfig; |
| private ElasticsearchColumnMetadataDao columnMetadataDao; |
| private ElasticsearchRequestSubmitter requestSubmitter; |
| |
| public ElasticsearchSearchDao(ElasticsearchClient client, |
| AccessConfig accessConfig, |
| ElasticsearchColumnMetadataDao columnMetadataDao, |
| ElasticsearchRequestSubmitter requestSubmitter) { |
| this.client = client; |
| this.accessConfig = accessConfig; |
| this.columnMetadataDao = columnMetadataDao; |
| this.requestSubmitter = requestSubmitter; |
| } |
| |
| @Override |
| public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { |
| if(searchRequest.getQuery() == null) { |
| throw new InvalidSearchException("Search query is invalid: null"); |
| } |
| return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery())); |
| } |
| |
| @Override |
| public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { |
| return group(groupRequest, new QueryStringQueryBuilder(groupRequest.getQuery())); |
| } |
| |
| /** |
| * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. |
| * @param request The request defining the parameters of the search |
| * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping |
| * @return The results of the query |
| * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search |
| */ |
| protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException { |
| org.elasticsearch.action.search.SearchRequest esRequest; |
| org.elasticsearch.action.search.SearchResponse esResponse; |
| |
| if(client == null) { |
| throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); |
| } |
| |
| if (request.getSize() > accessConfig.getMaxSearchResults()) { |
| throw new InvalidSearchException("Search result size must be less than " + accessConfig.getMaxSearchResults()); |
| } |
| |
| esRequest = buildSearchRequest(request, queryBuilder); |
| esResponse = requestSubmitter.submitSearch(esRequest); |
| return buildSearchResponse(request, esResponse); |
| } |
| |
| /** |
| * Builds an Elasticsearch search request. |
| * @param searchRequest The Metron search request. |
| * @param queryBuilder |
| * @return An Elasticsearch search request. |
| */ |
| private org.elasticsearch.action.search.SearchRequest buildSearchRequest( |
| SearchRequest searchRequest, |
| QueryBuilder queryBuilder) throws InvalidSearchException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???")); |
| } |
| SearchSourceBuilder searchBuilder = new SearchSourceBuilder() |
| .size(searchRequest.getSize()) |
| .from(searchRequest.getFrom()) |
| .query(queryBuilder) |
| .trackScores(true); |
| List<String> fields = searchRequest.getFields(); |
| // column metadata needed to understand the type of each sort field |
| Map<String, FieldType> meta; |
| try { |
| meta = columnMetadataDao.getColumnMetadata(searchRequest.getIndices()); |
| } catch(IOException e) { |
| throw new InvalidSearchException("Unable to get column metadata", e); |
| } |
| |
| // handle sort fields |
| for(SortField sortField : searchRequest.getSort()) { |
| |
| // what type is the sort field? |
| FieldType sortFieldType = meta.getOrDefault(sortField.getField(), FieldType.OTHER); |
| |
| // sort order - if ascending missing values sorted last. otherwise, missing values sorted first |
| org.elasticsearch.search.sort.SortOrder sortOrder = getElasticsearchSortOrder(sortField.getSortOrder()); |
| String missingSortOrder; |
| if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) { |
| missingSortOrder = SORT_MISSING_LAST; |
| } else { |
| missingSortOrder = SORT_MISSING_FIRST; |
| } |
| |
| // sort by the field - missing fields always last |
| FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField()) |
| .order(sortOrder) |
| .missing(missingSortOrder) |
| .unmappedType(sortFieldType.getFieldType()); |
| searchBuilder.sort(sortBy); |
| } |
| |
| // handle search fields |
| if (fields != null) { |
| searchBuilder.fetchSource("*", null); |
| } else { |
| searchBuilder.fetchSource(true); |
| } |
| |
| List<String> facetFields = searchRequest.getFacetFields(); |
| |
| // handle facet fields |
| if (facetFields != null) { |
| // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html |
| for(String field : facetFields) { |
| String name = getFacetAggregationName(field); |
| TermsAggregationBuilder terms = AggregationBuilders.terms( name).field(field); |
| // new TermsBuilder(name).field(field); |
| searchBuilder.aggregation(terms); |
| } |
| } |
| |
| // return the search request |
| String[] indices = wildcardIndices(searchRequest.getIndices()); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString()); |
| } |
| return new org.elasticsearch.action.search.SearchRequest() |
| .indices(indices) |
| .source(searchBuilder); |
| } |
| |
| /** |
| * Builds a search response. |
| * |
| * This effectively transforms an Elasticsearch search response into a Metron search response. |
| * |
| * @param searchRequest The Metron search request. |
| * @param esResponse The Elasticsearch search response. |
| * @return A Metron search response. |
| * @throws InvalidSearchException |
| */ |
| private SearchResponse buildSearchResponse( |
| SearchRequest searchRequest, |
| org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException { |
| |
| SearchResponse searchResponse = new SearchResponse(); |
| |
| searchResponse.setTotal(esResponse.getHits().getTotalHits()); |
| |
| // search hits --> search results |
| List<SearchResult> results = new ArrayList<>(); |
| for(SearchHit hit: esResponse.getHits().getHits()) { |
| results.add(getSearchResult(hit, searchRequest.getFields())); |
| } |
| searchResponse.setResults(results); |
| |
| // handle facet fields |
| if (searchRequest.getFacetFields() != null) { |
| List<String> facetFields = searchRequest.getFacetFields(); |
| Map<String, FieldType> commonColumnMetadata; |
| try { |
| commonColumnMetadata = columnMetadataDao.getColumnMetadata(searchRequest.getIndices()); |
| } catch (IOException e) { |
| throw new InvalidSearchException(String.format( |
| "Could not get common column metadata for indices %s", |
| Arrays.toString(searchRequest.getIndices().toArray()))); |
| } |
| searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata )); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???")); |
| } |
| return searchResponse; |
| } |
| |
| private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder( |
| org.apache.metron.indexing.dao.search.SortOrder sortOrder) { |
| return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ? |
| org.elasticsearch.search.sort.SortOrder.DESC : org.elasticsearch.search.sort.SortOrder.ASC; |
| } |
| |
| private String getFacetAggregationName(String field) { |
| return String.format("%s_count", field); |
| } |
| |
| private String[] wildcardIndices(List<String> indices) { |
| if(indices == null) |
| return new String[] {}; |
| |
| return indices |
| .stream() |
| .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER)) |
| .toArray(value -> new String[indices.size()]); |
| } |
| |
| private SearchResult getSearchResult(SearchHit searchHit, List<String> fields) { |
| SearchResult searchResult = new SearchResult(); |
| searchResult.setId(searchHit.getId()); |
| Map<String, Object> source; |
| if (fields != null) { |
| Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap(); |
| source = new HashMap<>(); |
| fields.forEach(field -> { |
| source.put(field, resultSourceAsMap.get(field)); |
| }); |
| } else { |
| source = searchHit.getSource(); |
| } |
| searchResult.setSource(source); |
| searchResult.setScore(searchHit.getScore()); |
| searchResult.setIndex(searchHit.getIndex()); |
| return searchResult; |
| } |
| |
| private Map<String, Map<String, Long>> getFacetCounts(List<String> fields, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { |
| Map<String, Map<String, Long>> fieldCounts = new HashMap<>(); |
| for (String field: fields) { |
| Map<String, Long> valueCounts = new HashMap<>(); |
| if(aggregations != null ) { |
| Aggregation aggregation = aggregations.get(getFacetAggregationName(field)); |
| if (aggregation instanceof Terms) { |
| Terms terms = (Terms) aggregation; |
| terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount())); |
| } |
| } |
| fieldCounts.put(field, valueCounts); |
| } |
| return fieldCounts; |
| } |
| |
| private String formatKey(Object key, FieldType type) { |
| if (FieldType.IP.equals(type) && key instanceof Long) { |
| return LegacyIpFieldMapper.longToIp((Long) key); |
| } else if (FieldType.BOOLEAN.equals(type)) { |
| return (Long) key == 1 ? "true" : "false"; |
| } else { |
| return key.toString(); |
| } |
| } |
| |
| /** |
| * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. |
| * @param groupRequest The request defining the parameters of the grouping |
| * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping |
| * @return The results of the query |
| * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search |
| */ |
| protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) |
| throws InvalidSearchException { |
| org.elasticsearch.action.search.SearchRequest esRequest; |
| org.elasticsearch.action.search.SearchResponse esResponse; |
| |
| if (client == null) { |
| throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); |
| } |
| if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) { |
| throw new InvalidSearchException("At least 1 group must be provided."); |
| } |
| |
| esRequest = buildGroupRequest(groupRequest, queryBuilder); |
| esResponse = requestSubmitter.submitSearch(esRequest); |
| GroupResponse response = buildGroupResponse(groupRequest, esResponse); |
| |
| return response; |
| } |
| |
| /** |
| * Builds a group search request. |
| * @param groupRequest The Metron group request. |
| * @param queryBuilder The search query. |
| * @return An Elasticsearch search request. |
| */ |
| private org.elasticsearch.action.search.SearchRequest buildGroupRequest( |
| GroupRequest groupRequest, |
| QueryBuilder queryBuilder) { |
| |
| // handle groups |
| TermsAggregationBuilder groups = getGroupsTermBuilder(groupRequest, 0); |
| final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() |
| .query(queryBuilder) |
| .aggregation(groups); |
| |
| // return the search request |
| String[] indices = wildcardIndices(groupRequest.getIndices()); |
| return new org.elasticsearch.action.search.SearchRequest() |
| .indices(indices) |
| .source(searchSourceBuilder); |
| } |
| |
| private TermsAggregationBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) { |
| List<Group> groups = groupRequest.getGroups(); |
| Group group = groups.get(index); |
| String aggregationName = getGroupByAggregationName(group.getField()); |
| TermsAggregationBuilder termsBuilder = AggregationBuilders.terms(aggregationName); |
| termsBuilder |
| .field(group.getField()) |
| .size(accessConfig.getMaxSearchGroups()) |
| .order(getElasticsearchGroupOrder(group.getOrder())); |
| if (index < groups.size() - 1) { |
| termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 1)); |
| } |
| Optional<String> scoreField = groupRequest.getScoreField(); |
| if (scoreField.isPresent()) { |
| SumAggregationBuilder scoreSumAggregationBuilder = AggregationBuilders.sum(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0); |
| termsBuilder.subAggregation(scoreSumAggregationBuilder); |
| } |
| return termsBuilder; |
| } |
| |
| private String getGroupByAggregationName(String field) { |
| return String.format("%s_group", field); |
| } |
| |
| private String getSumAggregationName(String field) { |
| return String.format("%s_score", field); |
| } |
| |
| private Order getElasticsearchGroupOrder(GroupOrder groupOrder) { |
| if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) { |
| return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : Order.term(false); |
| } else { |
| return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : Order.count(false); |
| } |
| } |
| |
| /** |
| * Build a group response. |
| * @param groupRequest The original group request. |
| * @param response The search response. |
| * @return A group response. |
| * @throws InvalidSearchException |
| */ |
| private GroupResponse buildGroupResponse( |
| GroupRequest groupRequest, |
| org.elasticsearch.action.search.SearchResponse response) throws InvalidSearchException { |
| |
| // build the search response |
| Map<String, FieldType> commonColumnMetadata; |
| try { |
| commonColumnMetadata = columnMetadataDao.getColumnMetadata(groupRequest.getIndices()); |
| } catch (IOException e) { |
| throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", |
| Arrays.toString(groupRequest.getIndices().toArray()))); |
| } |
| |
| GroupResponse groupResponse = new GroupResponse(); |
| groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); |
| groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata)); |
| return groupResponse; |
| } |
| |
| private List<GroupResult> getGroupResults(GroupRequest groupRequest, int index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { |
| List<Group> groups = groupRequest.getGroups(); |
| String field = groups.get(index).getField(); |
| List<GroupResult> searchResultGroups = new ArrayList<>(); |
| if(aggregations != null) { |
| Terms terms = aggregations.get(getGroupByAggregationName(field)); |
| for (Bucket bucket : terms.getBuckets()) { |
| GroupResult groupResult = new GroupResult(); |
| groupResult.setKey(formatKey(bucket.getKey(), commonColumnMetadata.get(field))); |
| groupResult.setTotal(bucket.getDocCount()); |
| Optional<String> scoreField = groupRequest.getScoreField(); |
| if (scoreField.isPresent()) { |
| Sum score = bucket.getAggregations().get(getSumAggregationName(scoreField.get())); |
| groupResult.setScore(score.getValue()); |
| } |
| if (index < groups.size() - 1) { |
| groupResult.setGroupedBy(groups.get(index + 1).getField()); |
| groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, bucket.getAggregations(), commonColumnMetadata)); |
| } |
| searchResultGroups.add(groupResult); |
| } |
| } |
| return searchResultGroups; |
| } |
| } |