blob: 9072f6a28b192205de90344ef605f8b59b3dd343 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.solr.dao;
import static org.apache.metron.solr.dao.SolrMetaAlertDao.METAALERTS_COLLECTION;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.metaalert.MetaAlertUpdateDao;
import org.apache.metron.indexing.dao.metaalert.MetaScores;
import org.apache.metron.indexing.dao.metaalert.lucene.AbstractLuceneMetaAlertUpdateDao;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.UpdateDao;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
public class SolrMetaAlertUpdateDao extends AbstractLuceneMetaAlertUpdateDao implements
MetaAlertUpdateDao, UpdateDao {
private SolrClient solrClient;
private SolrMetaAlertSearchDao metaAlertSearchDao;
/**
* Constructor a SolrMetaAlertUpdateDao
* @param solrDao An SolrDao to defer queries to.
* @param metaAlertSearchDao A MetaAlert aware search DAO used in retrieving items being mutated.
* @param retrieveLatestDao A RetrieveLatestDao for getting the current state of items being
* mutated.
*/
public SolrMetaAlertUpdateDao(SolrClient solrClient,
SolrDao solrDao,
SolrMetaAlertSearchDao metaAlertSearchDao,
SolrMetaAlertRetrieveLatestDao retrieveLatestDao,
MetaAlertConfig config) {
super(solrDao, retrieveLatestDao, config);
this.solrClient = solrClient;
this.metaAlertSearchDao = metaAlertSearchDao;
}
@Override
public Document createMetaAlert(MetaAlertCreateRequest request)
throws InvalidCreateException, IOException {
List<GetRequest> alertRequests = request.getAlerts();
if (request.getAlerts().isEmpty()) {
throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts");
}
if (request.getGroups().isEmpty()) {
throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups");
}
// Retrieve the documents going into the meta alert and build it
Iterable<Document> alerts = getRetrieveLatestDao().getAllLatest(alertRequests);
Document metaAlert = buildCreateDocument(alerts, request.getGroups(),
MetaAlertConstants.ALERT_FIELD);
MetaScores.calculateMetaScores(metaAlert, getConfig().getThreatTriageField(),
getConfig().getThreatSort());
// Add source type to be consistent with other sources and allow filtering
metaAlert.getDocument().put(getConfig().getSourceTypeField(), MetaAlertConstants.METAALERT_TYPE);
// Start a list of updates / inserts we need to run
Map<Document, Optional<String>> updates = new HashMap<>();
updates.put(metaAlert, Optional.of(METAALERTS_COLLECTION));
try {
// We need to update the associated alerts with the new meta alerts, making sure existing
// links are maintained.
Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap(
GetRequest::getGuid, GetRequest::getIndex));
Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap(
GetRequest::getGuid, GetRequest::getSensorType));
for (Document alert : alerts) {
if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) {
// Use the index in the request if it exists
Optional<String> index = guidToIndices.get(alert.getGuid());
if (!index.isPresent()) {
index = Optional.ofNullable(guidToSensorTypes.get(alert.getGuid()));
if (!index.isPresent()) {
throw new IllegalArgumentException("Could not find index for " + alert.getGuid());
}
}
updates.put(alert, index);
}
}
// Kick off any updates.
update(updates);
solrClient.commit(METAALERTS_COLLECTION);
return metaAlert;
} catch (IOException | SolrServerException e) {
throw new InvalidCreateException("Unable to create meta alert", e);
}
}
/**
* Updates a document in Solr for a given collection. Collection is not optional for Solr.
* @param update The update to be run
* @param collection The index to be updated. Mandatory for Solr
* @return The updated document.
* @throws IOException Thrown when an error occurs during the write.
*/
@Override
public Document update(Document update, Optional<String> collection) throws IOException {
if (MetaAlertConstants.METAALERT_TYPE.equals(update.getSensorType())) {
// We've been passed an update to the meta alert.
throw new UnsupportedOperationException("Meta alerts cannot be directly updated");
}
// Index can't be optional, or it won't be committed
Map<Document, Optional<String>> updates = new HashMap<>();
updates.put(update, collection);
// We need to update an alert itself. It cannot be delegated in Solr; we need to retrieve all
// metaalerts and update the entire document for each.
SearchResponse searchResponse;
try {
searchResponse = metaAlertSearchDao.getAllMetaAlertsForAlert(update.getGuid());
} catch (InvalidSearchException e) {
throw new IOException("Unable to retrieve metaalerts for alert", e);
}
ArrayList<Document> metaAlerts = new ArrayList<>();
for (SearchResult searchResult : searchResponse.getResults()) {
Document doc = new Document(searchResult.getSource(), searchResult.getId(),
MetaAlertConstants.METAALERT_TYPE, 0L);
metaAlerts.add(doc);
}
for (Document metaAlert : metaAlerts) {
if (replaceAlertInMetaAlert(metaAlert, update)) {
updates.put(metaAlert, Optional.of(METAALERTS_COLLECTION));
}
}
// Run the alert's update
getUpdateDao().batchUpdate(updates);
try {
solrClient.commit(METAALERTS_COLLECTION);
if (collection.isPresent()) {
solrClient.commit(collection.get());
}
} catch (SolrServerException e) {
throw new IOException("Unable to update document", e);
}
return update;
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
return getUpdateDao().addCommentToAlert(request);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
return getUpdateDao().removeCommentFromAlert(request);
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest)
throws IOException {
return getUpdateDao().addCommentToAlert(request, latest);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest)
throws IOException {
return getUpdateDao().removeCommentFromAlert(request, latest);
}
protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) {
boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert,
Collections.singleton(alert.getGuid()));
if (metaAlertUpdated) {
addAlertsToMetaAlert(metaAlert, Collections.singleton(alert));
}
return metaAlertUpdated;
}
@Override
public Document addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
throws IOException, IllegalStateException {
Document metaAlert = getRetrieveLatestDao()
.getLatest(metaAlertGuid, MetaAlertConstants.METAALERT_TYPE);
if (MetaAlertStatus.ACTIVE.getStatusString()
.equals(metaAlert.getDocument().get(MetaAlertConstants.STATUS_FIELD))) {
Iterable<Document> alerts = getRetrieveLatestDao().getAllLatest(alertRequests);
Map<Document, Optional<String>> updates = buildAddAlertToMetaAlertUpdates(metaAlert, alerts);
update(updates);
} else {
throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed");
}
try {
solrClient.commit(METAALERTS_COLLECTION);
} catch (SolrServerException e) {
throw new IOException("Unable to commit alerts to metaalert: " + metaAlertGuid, e);
}
return metaAlert;
}
}