blob: ac5417e1537d0cbf5ac350eaa879c3630d746d51 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.dao;
import org.apache.metron.common.Constants;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.MultiIndexDao;
import org.apache.metron.indexing.dao.RetrieveLatestDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
import org.apache.metron.indexing.dao.metaalert.MetaAlertDao;
import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.GroupResponse;
import org.apache.metron.indexing.dao.search.InvalidCreateException;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
import org.apache.metron.indexing.dao.update.PatchRequest;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Supplier;
public class ElasticsearchMetaAlertDao implements MetaAlertDao {
public static final String THREAT_TRIAGE_FIELD = MetaAlertConstants.THREAT_FIELD_DEFAULT
.replace('.', ':');
public static final String METAALERTS_INDEX = "metaalert_index";
public static final String SOURCE_TYPE_FIELD = Constants.SENSOR_TYPE.replace('.', ':');
protected String metaAlertsIndex = METAALERTS_INDEX;
protected String threatSort = MetaAlertConstants.THREAT_SORT_DEFAULT;
private ElasticsearchDao elasticsearchDao;
private IndexDao indexDao;
private ElasticsearchMetaAlertSearchDao metaAlertSearchDao;
private ElasticsearchMetaAlertUpdateDao metaAlertUpdateDao;
private ElasticsearchMetaAlertRetrieveLatestDao metaAlertRetrieveLatestDao;
protected int pageSize = 500;
/**
* Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
* @param indexDao The Dao to wrap
*/
public ElasticsearchMetaAlertDao(IndexDao indexDao) {
this(indexDao, METAALERTS_INDEX, MetaAlertConstants.THREAT_SORT_DEFAULT);
}
/**
* Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts.
* @param indexDao The Dao to wrap
* @param threatSort The summary aggregation of all child threat triage scores used
* as the overall threat triage score for the metaalert. This
* can be either max, min, average, count, median, or sum.
*/
public ElasticsearchMetaAlertDao(IndexDao indexDao, String metaAlertsIndex,
String threatSort) {
init(indexDao, Optional.of(threatSort));
this.threatSort = threatSort;
this.metaAlertsIndex = metaAlertsIndex;
}
public ElasticsearchMetaAlertDao() {
//uninitialized.
}
/**
* Initializes this implementation by setting the supplied IndexDao and also setting a separate
* ElasticsearchDao.
* This is needed for some specific Elasticsearch functions (looking up an index from a GUID for
* example).
* @param indexDao The DAO to wrap for our queries
* @param threatSort The summary aggregation of the child threat triage scores used
* as the overall threat triage score for the metaalert. This
* can be either max, min, average, count, median, or sum.
*/
@Override
public void init(IndexDao indexDao, Optional<String> threatSort) {
if (indexDao instanceof MultiIndexDao) {
this.indexDao = indexDao;
MultiIndexDao multiIndexDao = (MultiIndexDao) indexDao;
for (IndexDao childDao : multiIndexDao.getIndices()) {
if (childDao instanceof ElasticsearchDao) {
this.elasticsearchDao = (ElasticsearchDao) childDao;
}
}
} else if (indexDao instanceof ElasticsearchDao) {
this.indexDao = indexDao;
this.elasticsearchDao = (ElasticsearchDao) indexDao;
} else {
throw new IllegalArgumentException(
"Need an ElasticsearchDao when using ElasticsearchMetaAlertDao"
);
}
if (threatSort.isPresent()) {
this.threatSort = threatSort.get();
}
Supplier<Map<String, Object>> globalConfigSupplier = () -> new HashMap<>();
if(elasticsearchDao != null && elasticsearchDao.getAccessConfig() != null) {
globalConfigSupplier = elasticsearchDao.getAccessConfig().getGlobalConfigSupplier();
}
MetaAlertConfig config = new MetaAlertConfig(
metaAlertsIndex,
this.threatSort,
globalConfigSupplier
) {
@Override
protected String getDefaultThreatTriageField() {
return THREAT_TRIAGE_FIELD;
}
@Override
protected String getDefaultSourceTypeField() {
return SOURCE_TYPE_FIELD;
}
};
this.metaAlertSearchDao = new ElasticsearchMetaAlertSearchDao(
elasticsearchDao,
config,
pageSize);
this.metaAlertRetrieveLatestDao = new ElasticsearchMetaAlertRetrieveLatestDao(indexDao);
this.metaAlertUpdateDao = new ElasticsearchMetaAlertUpdateDao(
elasticsearchDao,
metaAlertRetrieveLatestDao,
config,
pageSize);
}
@Override
public void init(AccessConfig config) {
// Do nothing. We're just wrapping a child dao
}
@Override
public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
return indexDao.getColumnMetadata(indices);
}
@Override
public Document getLatest(String guid, String sensorType) throws IOException {
return indexDao.getLatest(guid, sensorType);
}
@Override
public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
return indexDao.getAllLatest(getRequests);
}
@Override
public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException {
return metaAlertSearchDao.getAllMetaAlertsForAlert(guid);
}
@Override
public Document createMetaAlert(MetaAlertCreateRequest request)
throws InvalidCreateException, IOException {
return metaAlertUpdateDao.createMetaAlert(request);
}
@Override
public Document addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
throws IOException {
return metaAlertUpdateDao.addAlertsToMetaAlert(metaAlertGuid, alertRequests);
}
@Override
public Document removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests)
throws IOException {
return metaAlertUpdateDao.removeAlertsFromMetaAlert(metaAlertGuid, alertRequests);
}
@Override
public Document updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status)
throws IOException {
return metaAlertUpdateDao.updateMetaAlertStatus(metaAlertGuid, status);
}
@Override
public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
return metaAlertSearchDao.search(searchRequest);
}
@Override
public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
return metaAlertSearchDao.group(groupRequest);
}
@Override
public Document update(Document update, Optional<String> index) throws IOException {
return metaAlertUpdateDao.update(update, index);
}
@Override
public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) {
return metaAlertUpdateDao.batchUpdate(updates);
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
return indexDao.addCommentToAlert(request);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
return indexDao.removeCommentFromAlert(request);
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
return indexDao.addCommentToAlert(request, latest);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
return indexDao.removeCommentFromAlert(request, latest);
}
@Override
public Document patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request,
Optional<Long> timestamp)
throws OriginalNotFoundException, IOException {
return metaAlertUpdateDao.patch(retrieveLatestDao, request, timestamp);
}
public void setPageSize(int pageSize) {
this.pageSize = pageSize;
}
}