blob: 519e8031409b0f1005afc602fb94341a6a52f3f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.dao;
import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX;
import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.metron.common.Constants;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertRetrieveLatestDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.metaalert.MetaScores;
import org.apache.metron.indexing.dao.metaalert.lucene.AbstractLuceneMetaAlertUpdateDao;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.InnerHitBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
public class ElasticsearchMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao {
private static final String INDEX_NOT_FOUND_INDICES_KEY = "es.index";
private ElasticsearchDao elasticsearchDao;
private MetaAlertRetrieveLatestDao retrieveLatestDao;
private int pageSize;
/**
* Constructor an ElasticsearchMetaAlertUpdateDao
* @param elasticsearchDao An UpdateDao to defer queries to.
* @param retrieveLatestDao A RetrieveLatestDao for getting the current state of items being
* mutated.
* @param config The meta alert config to use.
*/
public ElasticsearchMetaAlertUpdateDao(
ElasticsearchDao elasticsearchDao,
MetaAlertRetrieveLatestDao retrieveLatestDao,
MetaAlertConfig config,
int pageSize
) {
super(elasticsearchDao, retrieveLatestDao, config);
this.elasticsearchDao = elasticsearchDao;
this.retrieveLatestDao = retrieveLatestDao;
this.pageSize = pageSize;
}
@Override
@SuppressWarnings("unchecked")
public Document createMetaAlert(MetaAlertCreateRequest request)
throws InvalidCreateException, IOException {
List<GetRequest> alertRequests = request.getAlerts();
if (request.getAlerts().isEmpty()) {
throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
}
if (request.getGroups().isEmpty()) {
throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
}
// Retrieve the documents going into the meta alert and build it
Iterable<Document> alerts = retrieveLatestDao.getAllLatest(alertRequests);
Document metaAlert = buildCreateDocument(alerts, request.getGroups(),
MetaAlertConstants.ALERT_FIELD);
MetaScores
.calculateMetaScores(metaAlert, getConfig().getThreatTriageField(),
getConfig().getThreatSort());
// Add source type to be consistent with other sources and allow filtering
metaAlert.getDocument()
.put(getConfig().getSourceTypeField(), MetaAlertConstants.METAALERT_TYPE);
// Start a list of updates / inserts we need to run
Map<Document, Optional<String>> updates = new HashMap<>();
updates.put(metaAlert, Optional.of(getConfig().getMetaAlertIndex()));
try {
// We need to update the associated alerts with the new meta alerts, making sure existing
// links are maintained.
Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
GetRequest::getGuid, GetRequest::getIndex));
Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
GetRequest::getGuid, GetRequest::getSensorType));
for (Document alert : alerts) {
if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
// Use the index in the request if it exists
Optional<String> index = guidToIndices.get(alert.getGuid());
if (!index.isPresent()) {
// Look up the index from Elasticsearch if one is not supplied in the request
index = elasticsearchDao
.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid()));
if (!index.isPresent()) {
throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
}
}
updates.put(alert, index);
}
}
// Kick off any updates.
update(updates);
return metaAlert;
} catch (IOException ioe) {
throw new InvalidCreateException("Unable to create meta alert", ioe);
}
}
@Override
public Document update(Document update, Optional<String> index) throws IOException {
if (MetaAlertConstants.METAALERT_TYPE.equals(update.getSensorType())) {
// We've been passed an update to the meta alert.
throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
} else {
Map<Document, Optional<String>> updates = new HashMap<>();
updates.put(update, index);
try {
// We need to update an alert itself. Only that portion of the update can be delegated.
// We still need to get meta alerts potentially associated with it and update.
SearchResponse response = getMetaAlertsForAlert(update.getGuid());
Collection<Document> metaAlerts = response
.getResults()
.stream()
.map(result -> toDocument(result, update.getTimestamp()))
.collect(Collectors.toList());
// Each meta alert needs to be updated with the new alert
for (Document metaAlert : metaAlerts) {
replaceAlertInMetaAlert(metaAlert, update);
updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
}
} catch (IndexNotFoundException e) {
List<String> indicesNotFound = e.getMetadata(INDEX_NOT_FOUND_INDICES_KEY);
// If no metaalerts have been created yet and the metaalerts index does not exist, assume no metaalerts exist for alert.
// Otherwise throw the exception.
if (indicesNotFound.size() != 1 || !METAALERTS_INDEX.equals(indicesNotFound.get(0))) {
throw e;
}
}
// Run the alert's update
elasticsearchDao.batchUpdate(updates);
return update;
}
}
private Document toDocument(SearchResult result, Long timestamp) {
Document document = Document.fromJSON(result.getSource());
document.setTimestamp(timestamp);
document.setDocumentID(result.getId());
return document;
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
return getUpdateDao().addCommentToAlert(request);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
return getUpdateDao().removeCommentFromAlert(request);
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest)
throws IOException {
return getUpdateDao().addCommentToAlert(request, latest);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest)
throws IOException {
return getUpdateDao().removeCommentFromAlert(request, latest);
}
/**
* Given an alert GUID, retrieve all associated meta alerts.
* @param alertGuid The GUID of the child alert
* @return The Elasticsearch response containing the meta alerts
*/
protected SearchResponse getMetaAlertsForAlert(String alertGuid) throws IOException {
QueryBuilder qb = boolQuery()
.must(
nestedQuery(
MetaAlertConstants.ALERT_FIELD,
boolQuery()
.must(termQuery(MetaAlertConstants.ALERT_FIELD + "." + Constants.GUID,
alertGuid)),
ScoreMode.None
).innerHit(new InnerHitBuilder())
)
.must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
return ElasticsearchUtils
.queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), qb, getConfig().getMetaAlertIndex(),
pageSize);
}
protected void replaceAlertInMetaAlert(Document metaAlert, Document alert) {
boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert,
Collections.singleton(alert.getGuid()));
if (metaAlertUpdated) {
addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
}
}
}