| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.elasticsearch.dao; |
| |
| import com.google.common.base.Splitter; |
| import com.google.common.collect.Iterables; |
| import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; |
| import org.apache.metron.indexing.dao.AccessConfig; |
| import org.apache.metron.indexing.dao.IndexDao; |
| import org.apache.metron.indexing.dao.search.FieldType; |
| import org.apache.metron.indexing.dao.search.GetRequest; |
| import org.apache.metron.indexing.dao.search.Group; |
| import org.apache.metron.indexing.dao.search.GroupOrder; |
| import org.apache.metron.indexing.dao.search.GroupOrderType; |
| import org.apache.metron.indexing.dao.search.GroupRequest; |
| import org.apache.metron.indexing.dao.search.GroupResponse; |
| import org.apache.metron.indexing.dao.search.GroupResult; |
| import org.apache.metron.indexing.dao.search.InvalidSearchException; |
| import org.apache.metron.indexing.dao.search.SearchRequest; |
| import org.apache.metron.indexing.dao.search.SearchResponse; |
| import org.apache.metron.indexing.dao.search.SearchResult; |
| import org.apache.metron.indexing.dao.search.SortField; |
| import org.apache.metron.indexing.dao.search.SortOrder; |
| import org.apache.metron.indexing.dao.update.Document; |
| import org.elasticsearch.action.ActionWriteResponse.ShardInfo; |
| import org.elasticsearch.action.bulk.BulkRequestBuilder; |
| import org.elasticsearch.action.bulk.BulkResponse; |
| import org.elasticsearch.action.index.IndexRequest; |
| import org.elasticsearch.action.index.IndexResponse; |
| import org.elasticsearch.action.search.SearchRequestBuilder; |
| import org.elasticsearch.client.transport.TransportClient; |
| import org.elasticsearch.index.mapper.ip.IpFieldMapper; |
| import org.elasticsearch.index.query.QueryBuilder; |
| import org.elasticsearch.index.query.QueryBuilders; |
| import org.elasticsearch.index.query.QueryStringQueryBuilder; |
| import org.elasticsearch.search.SearchHit; |
| import org.elasticsearch.search.SearchHits; |
| import org.elasticsearch.search.aggregations.Aggregation; |
| import org.elasticsearch.search.aggregations.Aggregations; |
| import org.elasticsearch.search.aggregations.bucket.terms.Terms; |
| import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; |
| import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; |
| import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; |
| import org.elasticsearch.search.aggregations.metrics.sum.Sum; |
| import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; |
| import org.elasticsearch.search.builder.SearchSourceBuilder; |
| import org.elasticsearch.search.sort.FieldSortBuilder; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.function.Function; |
| |
| import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; |
| |
| public class ElasticsearchDao implements IndexDao { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| /** |
| * The value required to ensure that Elasticsearch sorts missing values last. |
| */ |
| private static final String SORT_MISSING_LAST = "_last"; |
| |
| /** |
| * The value required to ensure that Elasticsearch sorts missing values last. |
| */ |
| private static final String SORT_MISSING_FIRST = "_first"; |
| |
| /** |
| * The Elasticsearch client. |
| */ |
| private transient TransportClient client; |
| |
| /** |
| * Retrieves column metadata about search indices. |
| */ |
| private ColumnMetadataDao columnMetadataDao; |
| |
| /** |
| * Handles the submission of search requests to Elasticsearch. |
| */ |
| private ElasticsearchRequestSubmitter requestSubmitter; |
| |
| private AccessConfig accessConfig; |
| |
| protected ElasticsearchDao(TransportClient client, |
| ColumnMetadataDao columnMetadataDao, |
| ElasticsearchRequestSubmitter requestSubmitter, |
| AccessConfig config) { |
| this.client = client; |
| this.columnMetadataDao = columnMetadataDao; |
| this.requestSubmitter = requestSubmitter; |
| this.accessConfig = config; |
| } |
| |
| public ElasticsearchDao() { |
| //uninitialized. |
| } |
| |
| @Override |
| public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { |
| return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery())); |
| } |
| |
| /** |
| * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. |
| * @param request The request defining the parameters of the search |
| * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping |
| * @return The results of the query |
| * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search |
| */ |
| protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException { |
| org.elasticsearch.action.search.SearchRequest esRequest; |
| org.elasticsearch.action.search.SearchResponse esResponse; |
| |
| if(client == null) { |
| throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); |
| } |
| |
| if (request.getSize() > accessConfig.getMaxSearchResults()) { |
| throw new InvalidSearchException("Search result size must be less than " + accessConfig.getMaxSearchResults()); |
| } |
| |
| esRequest = buildSearchRequest(request, queryBuilder); |
| esResponse = requestSubmitter.submitSearch(esRequest); |
| return buildSearchResponse(request, esResponse); |
| } |
| |
| /** |
| * Builds an Elasticsearch search request. |
| * @param searchRequest The Metron search request. |
| * @param queryBuilder |
| * @return An Elasticsearch search request. |
| */ |
| private org.elasticsearch.action.search.SearchRequest buildSearchRequest( |
| SearchRequest searchRequest, |
| QueryBuilder queryBuilder) throws InvalidSearchException { |
| |
| LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???")); |
| SearchSourceBuilder searchBuilder = new SearchSourceBuilder() |
| .size(searchRequest.getSize()) |
| .from(searchRequest.getFrom()) |
| .query(queryBuilder) |
| .trackScores(true); |
| |
| // column metadata needed to understand the type of each sort field |
| Map<String, FieldType> meta; |
| try { |
| meta = getColumnMetadata(searchRequest.getIndices()); |
| } catch(IOException e) { |
| throw new InvalidSearchException("Unable to get column metadata", e); |
| } |
| |
| // handle sort fields |
| for(SortField sortField : searchRequest.getSort()) { |
| |
| // what type is the sort field? |
| FieldType sortFieldType = meta.getOrDefault(sortField.getField(), FieldType.OTHER); |
| |
| // sort order - if ascending missing values sorted last. otherwise, missing values sorted first |
| org.elasticsearch.search.sort.SortOrder sortOrder = getElasticsearchSortOrder(sortField.getSortOrder()); |
| String missingSortOrder; |
| if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) { |
| missingSortOrder = SORT_MISSING_LAST; |
| } else { |
| missingSortOrder = SORT_MISSING_FIRST; |
| } |
| |
| // sort by the field - missing fields always last |
| FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField()) |
| .order(sortOrder) |
| .missing(missingSortOrder) |
| .unmappedType(sortFieldType.getFieldType()); |
| searchBuilder.sort(sortBy); |
| } |
| |
| // handle search fields |
| if (searchRequest.getFields().isPresent()) { |
| searchBuilder.fields(searchRequest.getFields().get()); |
| } else { |
| searchBuilder.fetchSource(true); |
| } |
| |
| // handle facet fields |
| if (searchRequest.getFacetFields().isPresent()) { |
| for(String field : searchRequest.getFacetFields().get()) { |
| String name = getFacentAggregationName(field); |
| TermsBuilder terms = new TermsBuilder(name).field(field); |
| searchBuilder.aggregation(terms); |
| } |
| } |
| |
| // return the search request |
| String[] indices = wildcardIndices(searchRequest.getIndices()); |
| LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString()); |
| return new org.elasticsearch.action.search.SearchRequest() |
| .indices(indices) |
| .source(searchBuilder); |
| } |
| |
| /** |
| * Builds a search response. |
| * |
| * This effectively transforms an Elasticsearch search response into a Metron search response. |
| * |
| * @param searchRequest The Metron search request. |
| * @param esResponse The Elasticsearch search response. |
| * @return A Metron search response. |
| * @throws InvalidSearchException |
| */ |
| private SearchResponse buildSearchResponse( |
| SearchRequest searchRequest, |
| org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException { |
| |
| SearchResponse searchResponse = new SearchResponse(); |
| searchResponse.setTotal(esResponse.getHits().getTotalHits()); |
| |
| // search hits --> search results |
| List<SearchResult> results = new ArrayList<>(); |
| for(SearchHit hit: esResponse.getHits().getHits()) { |
| results.add(getSearchResult(hit, searchRequest.getFields().isPresent())); |
| } |
| searchResponse.setResults(results); |
| |
| // handle facet fields |
| if (searchRequest.getFacetFields().isPresent()) { |
| List<String> facetFields = searchRequest.getFacetFields().get(); |
| Map<String, FieldType> commonColumnMetadata; |
| try { |
| commonColumnMetadata = getColumnMetadata(searchRequest.getIndices()); |
| } catch (IOException e) { |
| throw new InvalidSearchException(String.format( |
| "Could not get common column metadata for indices %s", |
| Arrays.toString(searchRequest.getIndices().toArray()))); |
| } |
| searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata )); |
| } |
| |
| LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???")); |
| return searchResponse; |
| } |
| |
| @Override |
| public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { |
| return group(groupRequest, new QueryStringQueryBuilder(groupRequest.getQuery())); |
| } |
| |
| /** |
| * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. |
| * @param groupRequest The request defining the parameters of the grouping |
| * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping |
| * @return The results of the query |
| * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search |
| */ |
| protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) |
| throws InvalidSearchException { |
| org.elasticsearch.action.search.SearchRequest esRequest; |
| org.elasticsearch.action.search.SearchResponse esResponse; |
| |
| if (client == null) { |
| throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); |
| } |
| if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) { |
| throw new InvalidSearchException("At least 1 group must be provided."); |
| } |
| |
| esRequest = buildGroupRequest(groupRequest, queryBuilder); |
| esResponse = requestSubmitter.submitSearch(esRequest); |
| GroupResponse response = buildGroupResponse(groupRequest, esResponse); |
| |
| return response; |
| } |
| |
| /** |
| * Builds a group search request. |
| * @param groupRequest The Metron group request. |
| * @param queryBuilder The search query. |
| * @return An Elasticsearch search request. |
| */ |
| private org.elasticsearch.action.search.SearchRequest buildGroupRequest( |
| GroupRequest groupRequest, |
| QueryBuilder queryBuilder) { |
| |
| // handle groups |
| TermsBuilder groups = getGroupsTermBuilder(groupRequest, 0); |
| final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() |
| .query(queryBuilder) |
| .aggregation(groups); |
| |
| // return the search request |
| String[] indices = wildcardIndices(groupRequest.getIndices()); |
| return new org.elasticsearch.action.search.SearchRequest() |
| .indices(indices) |
| .source(searchSourceBuilder); |
| } |
| |
| /** |
| * Build a group response. |
| * @param groupRequest The original group request. |
| * @param response The search response. |
| * @return A group response. |
| * @throws InvalidSearchException |
| */ |
| private GroupResponse buildGroupResponse( |
| GroupRequest groupRequest, |
| org.elasticsearch.action.search.SearchResponse response) throws InvalidSearchException { |
| |
| // build the search response |
| Map<String, FieldType> commonColumnMetadata; |
| try { |
| commonColumnMetadata = getColumnMetadata(groupRequest.getIndices()); |
| } catch (IOException e) { |
| throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", |
| Arrays.toString(groupRequest.getIndices().toArray()))); |
| } |
| |
| GroupResponse groupResponse = new GroupResponse(); |
| groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); |
| groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata)); |
| return groupResponse; |
| } |
| |
| private String[] wildcardIndices(List<String> indices) { |
| if(indices == null) |
| return new String[] {}; |
| |
| return indices |
| .stream() |
| .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER)) |
| .toArray(value -> new String[indices.size()]); |
| } |
| |
| @Override |
| public synchronized void init(AccessConfig config) { |
| if(this.client == null) { |
| this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings()); |
| this.accessConfig = config; |
| this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin()); |
| this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client); |
| } |
| |
| if(columnMetadataDao == null) { |
| throw new IllegalArgumentException("No ColumnMetadataDao available"); |
| } |
| |
| if(requestSubmitter == null) { |
| throw new IllegalArgumentException("No ElasticsearchRequestSubmitter available"); |
| } |
| } |
| |
| @Override |
| public Document getLatest(final String guid, final String sensorType) throws IOException { |
| Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); |
| return doc.orElse(null); |
| } |
| |
| private Optional<Document> toDocument(final String guid, SearchHit hit) { |
| Long ts = 0L; |
| String doc = hit.getSourceAsString(); |
| String sourceType = toSourceType(hit.getType()); |
| try { |
| return Optional.of(new Document(doc, guid, sourceType, ts)); |
| } catch (IOException e) { |
| throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * Returns the source type based on a given doc type. |
| * @param docType The document type. |
| * @return The source type. |
| */ |
| private String toSourceType(String docType) { |
| return Iterables.getFirst(Splitter.on("_doc").split(docType), null); |
| } |
| |
| @Override |
| public Iterable<Document> getAllLatest( |
| final List<GetRequest> getRequests) throws IOException { |
| Collection<String> guids = new HashSet<>(); |
| Collection<String> sensorTypes = new HashSet<>(); |
| for (GetRequest getRequest: getRequests) { |
| guids.add(getRequest.getGuid()); |
| sensorTypes.add(getRequest.getSensorType()); |
| } |
| List<Document> documents = searchByGuids( |
| guids |
| , sensorTypes |
| , hit -> { |
| Long ts = 0L; |
| String doc = hit.getSourceAsString(); |
| String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); |
| try { |
| return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); |
| } catch (IOException e) { |
| throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); |
| } |
| } |
| |
| ); |
| return documents; |
| } |
| |
| <T> Optional<T> searchByGuid(String guid, String sensorType, |
| Function<SearchHit, Optional<T>> callback) { |
| Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; |
| List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); |
| if (results.size() > 0) { |
| return Optional.of(results.get(0)); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| /** |
| * Return the search hit based on the UUID and sensor type. |
| * A callback can be specified to transform the hit into a type T. |
| * If more than one hit happens, the first one will be returned. |
| */ |
| <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes, |
| Function<SearchHit, Optional<T>> callback) { |
| QueryBuilder query; |
| if (sensorTypes != null) { |
| String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new); |
| query = QueryBuilders.idsQuery(types).ids(guids); |
| } else { |
| query = QueryBuilders.idsQuery().ids(guids); |
| } |
| SearchRequestBuilder request = client.prepareSearch() |
| .setQuery(query) |
| .setSource("message") |
| .setSize(guids.size()) |
| ; |
| org.elasticsearch.action.search.SearchResponse response = request.get(); |
| SearchHits hits = response.getHits(); |
| List<T> results = new ArrayList<>(); |
| for (SearchHit hit : hits) { |
| Optional<T> result = callback.apply(hit); |
| if (result.isPresent()) { |
| results.add(result.get()); |
| } |
| } |
| return results; |
| } |
| |
| @Override |
| public void update(Document update, Optional<String> index) throws IOException { |
| String indexPostfix = ElasticsearchUtils |
| .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); |
| String sensorType = update.getSensorType(); |
| String indexName = getIndexName(update, index, indexPostfix); |
| |
| IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName); |
| try { |
| IndexResponse response = client.index(indexRequest).get(); |
| |
| ShardInfo shardInfo = response.getShardInfo(); |
| int failed = shardInfo.getFailed(); |
| if (failed > 0) { |
| throw new IOException( |
| "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures())); |
| } |
| } catch (Exception e) { |
| throw new IOException(e.getMessage(), e); |
| } |
| } |
| |
| @Override |
| public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { |
| String indexPostfix = ElasticsearchUtils |
| .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); |
| |
| BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); |
| |
| // Get the indices we'll actually be using for each Document. |
| for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) { |
| Document update = updateEntry.getKey(); |
| String sensorType = update.getSensorType(); |
| String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix); |
| IndexRequest indexRequest = buildIndexRequest( |
| update, |
| sensorType, |
| indexName |
| ); |
| |
| bulkRequestBuilder.add(indexRequest); |
| } |
| |
| BulkResponse bulkResponse = bulkRequestBuilder.get(); |
| if (bulkResponse.hasFailures()) { |
| LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage()); |
| throw new IOException( |
| "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage()); |
| } |
| } |
| |
| protected String getIndexName(Document update, Optional<String> index, String indexPostFix) { |
| return index.orElse(getIndexName(update.getGuid(), update.getSensorType()) |
| .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null)) |
| ); |
| } |
| |
| protected Optional<String> getIndexName(String guid, String sensorType) { |
| return searchByGuid(guid, |
| sensorType, |
| hit -> Optional.ofNullable(hit.getIndex()) |
| ); |
| } |
| |
| protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) { |
| String type = sensorType + "_doc"; |
| Object ts = update.getTimestamp(); |
| IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid()) |
| .source(update.getDocument()); |
| if(ts != null) { |
| indexRequest = indexRequest.timestamp(ts.toString()); |
| } |
| |
| return indexRequest; |
| } |
| |
| @Override |
| public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { |
| return columnMetadataDao.getColumnMetadata(indices); |
| } |
| |
| private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder( |
| org.apache.metron.indexing.dao.search.SortOrder sortOrder) { |
| return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ? |
| org.elasticsearch.search.sort.SortOrder.DESC : org.elasticsearch.search.sort.SortOrder.ASC; |
| } |
| |
| private Order getElasticsearchGroupOrder(GroupOrder groupOrder) { |
| if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) { |
| return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : Order.term(false); |
| } else { |
| return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : Order.count(false); |
| } |
| } |
| |
| public Map<String, Map<String, Long>> getFacetCounts(List<String> fields, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { |
| Map<String, Map<String, Long>> fieldCounts = new HashMap<>(); |
| for (String field: fields) { |
| Map<String, Long> valueCounts = new HashMap<>(); |
| Aggregation aggregation = aggregations.get(getFacentAggregationName(field)); |
| if (aggregation instanceof Terms) { |
| Terms terms = (Terms) aggregation; |
| terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount())); |
| } |
| fieldCounts.put(field, valueCounts); |
| } |
| return fieldCounts; |
| } |
| |
| private String formatKey(Object key, FieldType type) { |
| if (FieldType.IP.equals(type)) { |
| return IpFieldMapper.longToIp((Long) key); |
| } else if (FieldType.BOOLEAN.equals(type)) { |
| return (Long) key == 1 ? "true" : "false"; |
| } else { |
| return key.toString(); |
| } |
| } |
| |
| private TermsBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) { |
| List<Group> groups = groupRequest.getGroups(); |
| Group group = groups.get(index); |
| String aggregationName = getGroupByAggregationName(group.getField()); |
| TermsBuilder termsBuilder = new TermsBuilder(aggregationName) |
| .field(group.getField()) |
| .size(accessConfig.getMaxSearchGroups()) |
| .order(getElasticsearchGroupOrder(group.getOrder())); |
| if (index < groups.size() - 1) { |
| termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 1)); |
| } |
| Optional<String> scoreField = groupRequest.getScoreField(); |
| if (scoreField.isPresent()) { |
| termsBuilder.subAggregation(new SumBuilder(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0)); |
| } |
| return termsBuilder; |
| } |
| |
| private List<GroupResult> getGroupResults(GroupRequest groupRequest, int index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { |
| List<Group> groups = groupRequest.getGroups(); |
| String field = groups.get(index).getField(); |
| Terms terms = aggregations.get(getGroupByAggregationName(field)); |
| List<GroupResult> searchResultGroups = new ArrayList<>(); |
| for(Bucket bucket: terms.getBuckets()) { |
| GroupResult groupResult = new GroupResult(); |
| groupResult.setKey(formatKey(bucket.getKey(), commonColumnMetadata.get(field))); |
| groupResult.setTotal(bucket.getDocCount()); |
| Optional<String> scoreField = groupRequest.getScoreField(); |
| if (scoreField.isPresent()) { |
| Sum score = bucket.getAggregations().get(getSumAggregationName(scoreField.get())); |
| groupResult.setScore(score.getValue()); |
| } |
| if (index < groups.size() - 1) { |
| groupResult.setGroupedBy(groups.get(index + 1).getField()); |
| groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, bucket.getAggregations(), commonColumnMetadata)); |
| } |
| searchResultGroups.add(groupResult); |
| } |
| return searchResultGroups; |
| } |
| |
| private SearchResult getSearchResult(SearchHit searchHit, boolean fieldsPresent) { |
| SearchResult searchResult = new SearchResult(); |
| searchResult.setId(searchHit.getId()); |
| Map<String, Object> source; |
| if (fieldsPresent) { |
| source = new HashMap<>(); |
| searchHit.getFields().forEach((key, value) -> { |
| source.put(key, value.getValues().size() == 1 ? value.getValue() : value.getValues()); |
| }); |
| } else { |
| source = searchHit.getSource(); |
| } |
| searchResult.setSource(source); |
| searchResult.setScore(searchHit.getScore()); |
| searchResult.setIndex(searchHit.getIndex()); |
| return searchResult; |
| } |
| |
| private String getFacentAggregationName(String field) { |
| return String.format("%s_count", field); |
| } |
| |
| public TransportClient getClient() { |
| return client; |
| } |
| |
| private String getGroupByAggregationName(String field) { |
| return String.format("%s_group", field); |
| } |
| |
| private String getSumAggregationName(String field) { |
| return String.format("%s_score", field); |
| } |
| |
| public ElasticsearchDao client(TransportClient client) { |
| this.client = client; |
| return this; |
| } |
| |
| public ElasticsearchDao columnMetadataDao(ColumnMetadataDao columnMetadataDao) { |
| this.columnMetadataDao = columnMetadataDao; |
| return this; |
| } |
| |
| public ElasticsearchDao accessConfig(AccessConfig accessConfig) { |
| this.accessConfig = accessConfig; |
| return this; |
| } |
| } |