blob: 2c42b08c369d8cea62338c95693da02c87d91ad6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.dao;
import static org.apache.metron.common.Constants.GUID;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.commons.collections4.SetUtils;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.metron.common.Constants;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.MetaAlertDao;
import org.apache.metron.indexing.dao.MultiIndexDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.metaalert.MetaScores;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.GroupResponse;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.indexing.dao.update.Document;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetRequest.Item;
import org.elasticsearch.action.get.MultiGetRequestBuilder;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
import org.apache.metron.stellar.common.utils.ConversionUtils;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
public class ElasticsearchMetaAlertDao implements MetaAlertDao {
public static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':');
private static final String STATUS_PATH = "/status";
private static final String ALERT_PATH = "/alert";
private IndexDao indexDao;
private ElasticsearchDao elasticsearchDao;
private String index = METAALERTS_INDEX;
private String threatTriageField = THREAT_FIELD_DEFAULT;
/**
* Defines which summary aggregation is used to represent the overall threat triage score for
* the metaalert. The summary aggregation is applied to the threat triage score of all child alerts.
*
* This overall score is primarily used for sorting; hence it is called the 'threatSort'. This
* can be either max, min, average, count, median, or sum.
*/
private String threatSort = THREAT_SORT_DEFAULT;
private int pageSize = 500;
/**
* Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
* @param indexDao The Dao to wrap
*/
public ElasticsearchMetaAlertDao(IndexDao indexDao) {
this(indexDao, METAALERTS_INDEX, THREAT_FIELD_DEFAULT, THREAT_SORT_DEFAULT);
}
/**
* Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
* @param indexDao The Dao to wrap
* @param triageLevelField The field name to use as the threat scoring field
* @param threatSort The summary aggregation of all child threat triage scores used
* as the overall threat triage score for the metaalert. This
* can be either max, min, average, count, median, or sum.
*/
public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField, String threatSort) {
init(indexDao, Optional.of(threatSort));
this.index = index;
this.threatTriageField = triageLevelField;
}
public ElasticsearchMetaAlertDao() {
//uninitialized.
}
/**
* Initializes this implementation by setting the supplied IndexDao and also setting a separate ElasticsearchDao.
* This is needed for some specific Elasticsearch functions (looking up an index from a GUID for example).
* @param indexDao The DAO to wrap for our queries
* @param threatSort The summary aggregation of the child threat triage scores used
* as the overall threat triage score for the metaalert. This
* can be either max, min, average, count, median, or sum.
*/
@Override
public void init(IndexDao indexDao, Optional<String> threatSort) {
if (indexDao instanceof MultiIndexDao) {
this.indexDao = indexDao;
MultiIndexDao multiIndexDao = (MultiIndexDao) indexDao;
for (IndexDao childDao : multiIndexDao.getIndices()) {
if (childDao instanceof ElasticsearchDao) {
this.elasticsearchDao = (ElasticsearchDao) childDao;
}
}
} else if (indexDao instanceof ElasticsearchDao) {
this.indexDao = indexDao;
this.elasticsearchDao = (ElasticsearchDao) indexDao;
} else {
throw new IllegalArgumentException(
"Need an ElasticsearchDao when using ElasticsearchMetaAlertDao"
);
}
if (threatSort.isPresent()) {
this.threatSort = threatSort.get();
}
}
@Override
public void init(AccessConfig config) {
// Do nothing. We're just wrapping a child dao
}
@Override
public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
if (guid == null || guid.trim().isEmpty()) {
throw new InvalidSearchException("Guid cannot be empty");
}
// Searches for all alerts containing the meta alert guid in it's "metalerts" array
QueryBuilder qb = boolQuery()
.must(
nestedQuery(
ALERT_FIELD,
boolQuery()
.must(termQuery(ALERT_FIELD + "." + GUID, guid)),
ScoreMode.None
).innerHit(new InnerHitBuilder())
)
.must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
return queryAllResults(qb);
}
@Override
@SuppressWarnings("unchecked")
public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
throws InvalidCreateException, IOException {
List<GetRequest> alertRequests = request.getAlerts();
if (request.getAlerts().isEmpty()) {
throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
}
if (request.getGroups().isEmpty()) {
throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
}
// Retrieve the documents going into the meta alert and build it
Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
Document metaAlert = buildCreateDocument(alerts, request.getGroups());
calculateMetaScores(metaAlert);
// Add source type to be consistent with other sources and allow filtering
metaAlert.getDocument().put(SOURCE_TYPE, MetaAlertDao.METAALERT_TYPE);
// Start a list of updates / inserts we need to run
Map<Document, Optional<String>> updates = new HashMap<>();
updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX));
try {
// We need to update the associated alerts with the new meta alerts, making sure existing
// links are maintained.
Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
GetRequest::getGuid, GetRequest::getIndex));
Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
GetRequest::getGuid, GetRequest::getSensorType));
for (Document alert: alerts) {
if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
// Use the index in the request if it exists
Optional<String> index = guidToIndices.get(alert.getGuid());
if (!index.isPresent()) {
// Look up the index from Elasticsearch if one is not supplied in the request
index = elasticsearchDao.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
if (!index.isPresent()) {
throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
}
}
updates.put(alert, index);
}
}
// Kick off any updates.
indexDaoUpdate(updates);
MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
createResponse.setCreated(true);
createResponse.setGuid(metaAlert.getGuid());
return createResponse;
} catch (IOException ioe) {
throw new InvalidCreateException("Unable to create meta alert", ioe);
}
}
@Override
public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
throws IOException {
Map<Document, Optional<String>> updates = new HashMap<>();
Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts);
if (metaAlertUpdated) {
calculateMetaScores(metaAlert);
updates.put(metaAlert, Optional.of(index));
for(Document alert: alerts) {
if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
updates.put(alert, Optional.empty());
}
}
indexDaoUpdate(updates);
}
return metaAlertUpdated;
} else {
throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
}
}
protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) {
boolean alertAdded = false;
List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert ->
(String) currentAlert.get(GUID)).collect(Collectors.toSet());
for (Document alert: alerts) {
String alertGuid = alert.getGuid();
// Only add an alert if it isn't already in the meta alert
if (!currentAlertGuids.contains(alertGuid)) {
currentAlerts.add(alert.getDocument());
alertAdded = true;
}
}
return alertAdded;
}
protected boolean addMetaAlertToAlert(String metaAlertGuid, Document alert) {
List<String> metaAlertField = new ArrayList<>();
List<String> alertField = (List<String>) alert.getDocument()
.get(MetaAlertDao.METAALERT_FIELD);
if (alertField != null) {
metaAlertField.addAll(alertField);
}
boolean metaAlertAdded = !metaAlertField.contains(metaAlertGuid);
if (metaAlertAdded) {
metaAlertField.add(metaAlertGuid);
alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
}
return metaAlertAdded;
}
@Override
public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
throws IOException {
Map<Document, Optional<String>> updates = new HashMap<>();
Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) {
Iterable<Document> alerts = indexDao.getAllLatest(alertRequests);
Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(
Collectors.toList());
boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids);
if (metaAlertUpdated) {
calculateMetaScores(metaAlert);
updates.put(metaAlert, Optional.of(index));
for(Document alert: alerts) {
if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) {
updates.put(alert, Optional.empty());
}
}
indexDaoUpdate(updates);
}
return metaAlertUpdated;
} else {
throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed");
}
}
protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) {
List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD);
int previousSize = currentAlerts.size();
// Only remove an alert if it is in the meta alert
currentAlerts.removeIf(currentAlert -> alertGuids.contains((String) currentAlert.get(GUID)));
return currentAlerts.size() != previousSize;
}
protected boolean removeMetaAlertFromAlert(String metaAlertGuid, Document alert) {
List<String> metaAlertField = new ArrayList<>();
List<String> alertField = (List<String>) alert.getDocument()
.get(MetaAlertDao.METAALERT_FIELD);
if (alertField != null) {
metaAlertField.addAll(alertField);
}
boolean metaAlertRemoved = metaAlertField.remove(metaAlertGuid);
if (metaAlertRemoved) {
alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
}
return metaAlertRemoved;
}
@Override
public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
throws IOException {
Map<Document, Optional<String>> updates = new HashMap<>();
Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE);
String currentStatus = (String) metaAlert.getDocument().get(MetaAlertDao.STATUS_FIELD);
boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus);
if (metaAlertUpdated) {
metaAlert.getDocument().put(MetaAlertDao.STATUS_FIELD, status.getStatusString());
List<GetRequest> getRequests = new ArrayList<>();
List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument()
.get(MetaAlertDao.ALERT_FIELD);
currentAlerts.stream().forEach(currentAlert -> {
getRequests.add(new GetRequest((String) currentAlert.get(GUID), (String) currentAlert.get(SOURCE_TYPE)));
});
Iterable<Document> alerts = indexDao.getAllLatest(getRequests);
List<Map<String, Object>> updatedAlerts = new ArrayList<>();
for (Document alert : alerts) {
boolean metaAlertAdded = false;
boolean metaAlertRemoved = false;
// If we're making it active add add the meta alert guid for every alert.
if (MetaAlertStatus.ACTIVE.equals(status)) {
metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert);
}
// If we're making it inactive, remove the meta alert guid from every alert.
if (MetaAlertStatus.INACTIVE.equals(status)) {
metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert);
}
if (metaAlertAdded || metaAlertRemoved) {
updates.put(alert, Optional.empty());
}
updatedAlerts.add(alert.getDocument());
}
if (MetaAlertStatus.ACTIVE.equals(status)) {
metaAlert.getDocument().put(MetaAlertDao.ALERT_FIELD, updatedAlerts);
}
updates.put(metaAlert, Optional.of(index));
}
if (metaAlertUpdated) {
indexDaoUpdate(updates);
}
return metaAlertUpdated;
}
@Override
public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
// Wrap the query to also get any meta-alerts.
QueryBuilder qb = constantScoreQuery(boolQuery()
.must(boolQuery()
.should(new QueryStringQueryBuilder(searchRequest.getQuery()))
.should(nestedQuery(
ALERT_FIELD,
new QueryStringQueryBuilder(searchRequest.getQuery()),
ScoreMode.None
)
)
)
// Ensures that it's a meta alert with active status or that it's an alert (signified by
// having no status field)
.must(boolQuery()
.should(termQuery(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()))
.should(boolQuery().mustNot(existsQuery(MetaAlertDao.STATUS_FIELD)))
)
.mustNot(existsQuery(MetaAlertDao.METAALERT_FIELD))
);
return elasticsearchDao.search(searchRequest, qb);
}
@Override
public Document getLatest(String guid, String sensorType) throws IOException {
return indexDao.getLatest(guid, sensorType);
}
@Override
public Iterable<Document> getAllLatest(
List<GetRequest> getRequests) throws IOException {
return indexDao.getAllLatest(getRequests);
}
@Override
public void update(Document update, Optional<String> index) throws IOException {
if (METAALERT_TYPE.equals(update.getSensorType())) {
// We've been passed an update to the meta alert.
throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
} else {
Map<Document, Optional<String>> updates = new HashMap<>();
updates.put(update, index);
// We need to update an alert itself. Only that portion of the update can be delegated.
// We still need to get meta alerts potentially associated with it and update.
Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream()
.map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), METAALERT_TYPE, 0L))
.collect(Collectors.toList());
// Each meta alert needs to be updated with the new alert
for (Document metaAlert : metaAlerts) {
replaceAlertInMetaAlert(metaAlert, update);
updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
}
// Run the alert's update
indexDao.batchUpdate(updates);
}
}
protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, Collections.singleton(alert.getGuid()));
if (metaAlertUpdated) {
addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
}
return metaAlertUpdated;
}
@Override
public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates");
}
/**
* Does not allow patches on the "alerts" or "status" fields. These fields must be updated with their
* dedicated methods.
*
* @param request The patch request
* @param timestamp Optionally a timestamp to set. If not specified then current time is used.
* @throws OriginalNotFoundException
* @throws IOException
*/
@Override
public void patch(PatchRequest request, Optional<Long> timestamp)
throws OriginalNotFoundException, IOException {
if (isPatchAllowed(request)) {
Document d = getPatchedDocument(request, timestamp);
indexDao.update(d, Optional.ofNullable(request.getIndex()));
} else {
throw new IllegalArgumentException("Meta alert patches are not allowed for /alert or /status paths. "
+ "Please use the add/remove alert or update status functions instead.");
}
}
protected boolean isPatchAllowed(PatchRequest request) {
if(request.getPatch() != null && !request.getPatch().isEmpty()) {
for(Map<String, Object> patch : request.getPatch()) {
Object pathObj = patch.get("path");
if(pathObj != null && pathObj instanceof String) {
String path = (String)pathObj;
if (STATUS_PATH.equals(path) || ALERT_PATH.equals(path)) {
return false;
}
}
}
}
return true;
}
/**
* Given an alert GUID, retrieve all associated meta alerts.
* @param alertGuid The GUID of the child alert
* @return The Elasticsearch response containing the meta alerts
*/
protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
QueryBuilder qb = boolQuery()
.must(
nestedQuery(
ALERT_FIELD,
boolQuery()
.must(termQuery(ALERT_FIELD + "." + Constants.GUID, alertGuid)),
ScoreMode.None
).innerHit(new InnerHitBuilder())
)
.must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
return queryAllResults(qb);
}
/**
* Elasticsearch queries default to 10 records returned. Some internal queries require that all
* results are returned. Rather than setting an arbitrarily high size, this method pages through results
* and returns them all in a single SearchResponse.
* @param qb
* @return
*/
protected SearchResponse queryAllResults(QueryBuilder qb) {
SearchRequestBuilder searchRequestBuilder = elasticsearchDao
.getClient()
.prepareSearch(index)
.addStoredField("*")
.setFetchSource(true)
.setQuery(qb)
.setSize(pageSize);
org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder
.execute()
.actionGet();
List<SearchResult> allResults = getSearchResults(esResponse);
long total = esResponse.getHits().getTotalHits();
if (total > pageSize) {
int pages = (int) (total / pageSize) + 1;
for (int i = 1; i < pages; i++) {
int from = i * pageSize;
searchRequestBuilder.setFrom(from);
esResponse = searchRequestBuilder
.execute()
.actionGet();
allResults.addAll(getSearchResults(esResponse));
}
}
SearchResponse searchResponse = new SearchResponse();
searchResponse.setTotal(total);
searchResponse.setResults(allResults);
return searchResponse;
}
/**
* Transforms a list of Elasticsearch SearchHits to a list of SearchResults
* @param searchResponse
* @return
*/
protected List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) {
return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> {
SearchResult searchResult = new SearchResult();
searchResult.setId(searchHit.getId());
searchResult.setSource(searchHit.getSource());
searchResult.setScore(searchHit.getScore());
searchResult.setIndex(searchHit.getIndex());
return searchResult;
}
).collect(Collectors.toList());
}
/**
* Build the Document representing a meta alert to be created.
* @param alerts The Elasticsearch results for the meta alerts child documents
* @param groups The groups used to create this meta alert
* @return A Document representing the new meta alert
*/
protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups) {
// Need to create a Document from the multiget. Scores will be calculated later
Map<String, Object> metaSource = new HashMap<>();
List<Map<String, Object>> alertList = new ArrayList<>();
for (Document alert: alerts) {
alertList.add(alert.getDocument());
}
metaSource.put(ALERT_FIELD, alertList);
// Add any meta fields
String guid = UUID.randomUUID().toString();
metaSource.put(GUID, guid);
metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis());
metaSource.put(GROUPS_FIELD, groups);
metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
return new Document(metaSource, guid, METAALERT_TYPE, System.currentTimeMillis());
}
/**
* Calls the single update variant if there's only one update, otherwise calls batch.
* @param updates The list of updates to run
* @throws IOException If there's an update error
*/
protected void indexDaoUpdate(Map<Document, Optional<String>> updates) throws IOException {
if (updates.size() == 1) {
Entry<Document, Optional<String>> singleUpdate = updates.entrySet().iterator().next();
indexDao.update(singleUpdate.getKey(), singleUpdate.getValue());
} else if (updates.size() > 1) {
indexDao.batchUpdate(updates);
} // else we have no updates, so don't do anything
}
@SuppressWarnings("unchecked")
protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException {
Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE);
if (latest == null) {
return new ArrayList<>();
}
List<String> guids = new ArrayList<>();
List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument()
.get(MetaAlertDao.ALERT_FIELD);
for (Map<String, Object> alert : latestAlerts) {
guids.add((String) alert.get(Constants.GUID));
}
List<Map<String, Object>> alerts = new ArrayList<>();
QueryBuilder query = QueryBuilders.idsQuery().addIds(guids.toArray(new String[0]));
SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch()
.setQuery(query);
org.elasticsearch.action.search.SearchResponse response = request.get();
for (SearchHit hit : response.getHits().getHits()) {
alerts.add(hit.sourceAsMap());
}
return alerts;
}
/**
* Builds an update Document for updating the meta alerts list.
* @param alertGuid The GUID of the alert to update
* @param sensorType The sensor type to update
* @param metaAlertField The new metaAlertList to use
* @return The update Document
*/
protected Document buildAlertUpdate(String alertGuid, String sensorType,
List<String> metaAlertField, Long timestamp) {
Document alertUpdate;
Map<String, Object> document = new HashMap<>();
document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
alertUpdate = new Document(
document,
alertGuid,
sensorType,
timestamp
);
return alertUpdate;
}
@Override
public Map<String, FieldType> getColumnMetadata(List<String> indices)
throws IOException {
return indexDao.getColumnMetadata(indices);
}
@Override
public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
// Wrap the query to hide any alerts already contained in meta alerts
QueryBuilder qb = QueryBuilders.boolQuery()
.must(new QueryStringQueryBuilder(groupRequest.getQuery()))
.mustNot(existsQuery(MetaAlertDao.METAALERT_FIELD));
return elasticsearchDao.group(groupRequest, qb);
}
/**
* Calculate the meta alert scores for a Document.
* @param metaAlert The Document containing scores
* @return Set of score statistics
*/
@SuppressWarnings("unchecked")
protected void calculateMetaScores(Document metaAlert) {
MetaScores metaScores = new MetaScores(new ArrayList<>());
List<Object> alertsRaw = ((List<Object>) metaAlert.getDocument().get(ALERT_FIELD));
if (alertsRaw != null && !alertsRaw.isEmpty()) {
ArrayList<Double> scores = new ArrayList<>();
for (Object alertRaw : alertsRaw) {
Map<String, Object> alert = (Map<String, Object>) alertRaw;
Double scoreNum = parseThreatField(alert.get(threatTriageField));
if (scoreNum != null) {
scores.add(scoreNum);
}
}
metaScores = new MetaScores(scores);
}
// add a summary (max, min, avg, ...) of all the threat scores from the child alerts
metaAlert.getDocument().putAll(metaScores.getMetaScores());
// add the overall threat score for the metaalert; one of the summary aggregations as defined by `threatSort`
Object threatScore = metaScores.getMetaScores().get(threatSort);
// add the threat score as a float; type needs to match the threat score field from each of the sensor indices
metaAlert.getDocument().put(threatTriageField, ConversionUtils.convert(threatScore, Float.class));
}
private Double parseThreatField(Object threatRaw) {
Double threat = null;
if (threatRaw instanceof Number) {
threat = ((Number) threatRaw).doubleValue();
} else if (threatRaw instanceof String) {
threat = Double.parseDouble((String) threatRaw);
}
return threat;
}
public int getPageSize() {
return pageSize;
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
}