| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.metron.elasticsearch.dao; |
| |
| import org.apache.metron.common.Constants; |
| import org.apache.metron.indexing.dao.AccessConfig; |
| import org.apache.metron.indexing.dao.IndexDao; |
| import org.apache.metron.indexing.dao.MultiIndexDao; |
| import org.apache.metron.indexing.dao.RetrieveLatestDao; |
| import org.apache.metron.indexing.dao.metaalert.MetaAlertConfig; |
| import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants; |
| import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; |
| import org.apache.metron.indexing.dao.metaalert.MetaAlertDao; |
| import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; |
| import org.apache.metron.indexing.dao.search.FieldType; |
| import org.apache.metron.indexing.dao.search.GetRequest; |
| import org.apache.metron.indexing.dao.search.GroupRequest; |
| import org.apache.metron.indexing.dao.search.GroupResponse; |
| import org.apache.metron.indexing.dao.search.InvalidCreateException; |
| import org.apache.metron.indexing.dao.search.InvalidSearchException; |
| import org.apache.metron.indexing.dao.search.SearchRequest; |
| import org.apache.metron.indexing.dao.search.SearchResponse; |
| import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; |
| import org.apache.metron.indexing.dao.update.Document; |
| import org.apache.metron.indexing.dao.update.OriginalNotFoundException; |
| import org.apache.metron.indexing.dao.update.PatchRequest; |
| |
| import java.io.IOException; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.function.Supplier; |
| |
| public class ElasticsearchMetaAlertDao implements MetaAlertDao { |
| |
| public static final String THREAT_TRIAGE_FIELD = MetaAlertConstants.THREAT_FIELD_DEFAULT |
| .replace('.', ':'); |
| public static final String METAALERTS_INDEX = "metaalert_index"; |
| public static final String SOURCE_TYPE_FIELD = Constants.SENSOR_TYPE.replace('.', ':'); |
| protected String metaAlertsIndex = METAALERTS_INDEX; |
| protected String threatSort = MetaAlertConstants.THREAT_SORT_DEFAULT; |
| |
| private ElasticsearchDao elasticsearchDao; |
| private IndexDao indexDao; |
| private ElasticsearchMetaAlertSearchDao metaAlertSearchDao; |
| private ElasticsearchMetaAlertUpdateDao metaAlertUpdateDao; |
| private ElasticsearchMetaAlertRetrieveLatestDao metaAlertRetrieveLatestDao; |
| |
| protected int pageSize = 500; |
| |
| /** |
| * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts. |
| * @param indexDao The Dao to wrap |
| */ |
| public ElasticsearchMetaAlertDao(IndexDao indexDao) { |
| this(indexDao, METAALERTS_INDEX, MetaAlertConstants.THREAT_SORT_DEFAULT); |
| } |
| |
| /** |
| * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts. |
| * @param indexDao The Dao to wrap |
| * @param threatSort The summary aggregation of all child threat triage scores used |
| * as the overall threat triage score for the metaalert. This |
| * can be either max, min, average, count, median, or sum. |
| */ |
| public ElasticsearchMetaAlertDao(IndexDao indexDao, String metaAlertsIndex, |
| String threatSort) { |
| init(indexDao, Optional.of(threatSort)); |
| this.threatSort = threatSort; |
| this.metaAlertsIndex = metaAlertsIndex; |
| } |
| |
| public ElasticsearchMetaAlertDao() { |
| //uninitialized. |
| } |
| |
| /** |
| * Initializes this implementation by setting the supplied IndexDao and also setting a separate |
| * ElasticsearchDao. |
| * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for |
| * example). |
| * @param indexDao The DAO to wrap for our queries |
| * @param threatSort The summary aggregation of the child threat triage scores used |
| * as the overall threat triage score for the metaalert. This |
| * can be either max, min, average, count, median, or sum. |
| */ |
| @Override |
| public void init(IndexDao indexDao, Optional<String> threatSort) { |
| if (indexDao instanceof MultiIndexDao) { |
| this.indexDao = indexDao; |
| MultiIndexDao multiIndexDao = (MultiIndexDao) indexDao; |
| for (IndexDao childDao : multiIndexDao.getIndices()) { |
| if (childDao instanceof ElasticsearchDao) { |
| this.elasticsearchDao = (ElasticsearchDao) childDao; |
| } |
| } |
| } else if (indexDao instanceof ElasticsearchDao) { |
| this.indexDao = indexDao; |
| this.elasticsearchDao = (ElasticsearchDao) indexDao; |
| } else { |
| throw new IllegalArgumentException( |
| "Need an ElasticsearchDao when using ElasticsearchMetaAlertDao" |
| ); |
| } |
| |
| if (threatSort.isPresent()) { |
| this.threatSort = threatSort.get(); |
| } |
| Supplier<Map<String, Object>> globalConfigSupplier = () -> new HashMap<>(); |
| if(elasticsearchDao != null && elasticsearchDao.getAccessConfig() != null) { |
| globalConfigSupplier = elasticsearchDao.getAccessConfig().getGlobalConfigSupplier(); |
| } |
| MetaAlertConfig config = new MetaAlertConfig( |
| metaAlertsIndex, |
| this.threatSort, |
| globalConfigSupplier |
| ) { |
| @Override |
| protected String getDefaultThreatTriageField() { |
| return THREAT_TRIAGE_FIELD; |
| } |
| |
| @Override |
| protected String getDefaultSourceTypeField() { |
| return SOURCE_TYPE_FIELD; |
| } |
| }; |
| |
| this.metaAlertSearchDao = new ElasticsearchMetaAlertSearchDao( |
| elasticsearchDao, |
| config, |
| pageSize); |
| this.metaAlertRetrieveLatestDao = new ElasticsearchMetaAlertRetrieveLatestDao(indexDao); |
| this.metaAlertUpdateDao = new ElasticsearchMetaAlertUpdateDao( |
| elasticsearchDao, |
| metaAlertRetrieveLatestDao, |
| config, |
| pageSize); |
| } |
| |
| @Override |
| public void init(AccessConfig config) { |
| // Do nothing. We're just wrapping a child dao |
| } |
| |
| @Override |
| public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { |
| return indexDao.getColumnMetadata(indices); |
| } |
| |
| @Override |
| public Document getLatest(String guid, String sensorType) throws IOException { |
| return indexDao.getLatest(guid, sensorType); |
| } |
| |
| @Override |
| public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { |
| return indexDao.getAllLatest(getRequests); |
| } |
| |
| @Override |
| public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException { |
| return metaAlertSearchDao.getAllMetaAlertsForAlert(guid); |
| } |
| |
| @Override |
| public Document createMetaAlert(MetaAlertCreateRequest request) |
| throws InvalidCreateException, IOException { |
| return metaAlertUpdateDao.createMetaAlert(request); |
| } |
| |
| @Override |
| public Document addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) |
| throws IOException { |
| return metaAlertUpdateDao.addAlertsToMetaAlert(metaAlertGuid, alertRequests); |
| } |
| |
| @Override |
| public Document removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) |
| throws IOException { |
| return metaAlertUpdateDao.removeAlertsFromMetaAlert(metaAlertGuid, alertRequests); |
| } |
| |
| @Override |
| public Document updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) |
| throws IOException { |
| return metaAlertUpdateDao.updateMetaAlertStatus(metaAlertGuid, status); |
| } |
| |
| @Override |
| public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { |
| return metaAlertSearchDao.search(searchRequest); |
| } |
| |
| @Override |
| public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { |
| return metaAlertSearchDao.group(groupRequest); |
| } |
| |
| @Override |
| public Document update(Document update, Optional<String> index) throws IOException { |
| return metaAlertUpdateDao.update(update, index); |
| } |
| |
| @Override |
| public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) { |
| return metaAlertUpdateDao.batchUpdate(updates); |
| } |
| |
| @Override |
| public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException { |
| return indexDao.addCommentToAlert(request); |
| } |
| |
| @Override |
| public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException { |
| return indexDao.removeCommentFromAlert(request); |
| } |
| |
| @Override |
| public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException { |
| return indexDao.addCommentToAlert(request, latest); |
| } |
| |
| @Override |
| public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException { |
| return indexDao.removeCommentFromAlert(request, latest); |
| } |
| |
| @Override |
| public Document patch(RetrieveLatestDao retrieveLatestDao, PatchRequest request, |
| Optional<Long> timestamp) |
| throws OriginalNotFoundException, IOException { |
| return metaAlertUpdateDao.patch(retrieveLatestDao, request, timestamp); |
| } |
| |
| public void setPageSize(int pageSize) { |
| this.pageSize = pageSize; |
| } |
| |
| } |