blob: 95d27db31c1eb5741e17c67799e5f94ebeefbb3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.dao;
import org.apache.metron.common.Constants;
import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.indexing.dao.RetrieveLatestDao;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.update.Document;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.termsQuery;
import static org.elasticsearch.index.query.QueryBuilders.typeQuery;
public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
private ElasticsearchClient client;
private ElasticsearchRequestSubmitter submitter;
public ElasticsearchRetrieveLatestDao(ElasticsearchClient client) {
this.client = client;
this.submitter = new ElasticsearchRequestSubmitter(client);
}
@Override
public Document getLatest(String guid, String sensorType) throws IOException {
Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(hit));
return doc.orElse(null);
}
@Override
public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
Collection<String> guids = new HashSet<>();
Collection<String> sensorTypes = new HashSet<>();
for (GetRequest getRequest : getRequests) {
guids.add(getRequest.getGuid());
sensorTypes.add(getRequest.getSensorType());
}
List<Document> documents = searchByGuids(guids, sensorTypes, hit -> toDocument(hit));
return documents;
}
<T> Optional<T> searchByGuid(String guid, String sensorType,
Function<SearchHit, Optional<T>> callback) throws IOException {
Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
if (results.size() > 0) {
return Optional.of(results.get(0));
} else {
return Optional.empty();
}
}
/**
* Return the search hit based on the UUID and sensor type.
* A callback can be specified to transform the hit into a type T.
* If more than one hit happens, the first one will be returned.
*/
<T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
Function<SearchHit, Optional<T>> callback) throws IOException {
if (guids == null || guids.isEmpty()) {
return Collections.emptyList();
}
// should match any of the guids
// the 'guid' field must be of type 'keyword' or this term query will not match
BoolQueryBuilder guidQuery = boolQuery().must(termsQuery(Constants.GUID, guids));
// should match any of the sensor types
BoolQueryBuilder sensorQuery = boolQuery();
sensorTypes.forEach(sensorType -> sensorQuery.should(typeQuery(sensorType + "_doc")));
// must have a match for both guid and sensor
BoolQueryBuilder query = boolQuery()
.must(guidQuery)
.must(sensorQuery);
// submit the search
SearchResponse response;
try {
SearchSourceBuilder source = new SearchSourceBuilder()
.query(query)
.size(guids.size());
SearchRequest request = new SearchRequest().source(source);
response = submitter.submitSearch(request);
} catch(InvalidSearchException e) {
throw new IOException(e);
}
// transform the search hits to results using the callback
List<T> results = new ArrayList<>();
for(SearchHit hit: response.getHits()) {
Optional<T> result = callback.apply(hit);
result.ifPresent(r -> results.add(r));
}
return results;
}
private Optional<Document> toDocument(SearchHit hit) {
Document document = Document.fromJSON(hit.getSource());
document.setDocumentID(hit.getId());
return Optional.of(document);
}
}