| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.solr.dao; |
| |
| import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD; |
| |
| import java.io.IOException; |
| import java.lang.invoke.MethodHandles; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| import org.apache.metron.indexing.dao.AccessConfig; |
| import org.apache.metron.indexing.dao.search.AlertComment; |
| import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest; |
| import org.apache.metron.indexing.dao.update.Document; |
| import org.apache.metron.indexing.dao.update.UpdateDao; |
| import org.apache.solr.client.solrj.SolrClient; |
| import org.apache.solr.client.solrj.SolrServerException; |
| import org.apache.solr.common.SolrInputDocument; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class SolrUpdateDao implements UpdateDao { |
| private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); |
| |
| private transient SolrClient client; |
| private AccessConfig config; |
| private transient SolrRetrieveLatestDao retrieveLatestDao; |
| |
| public SolrUpdateDao(SolrClient client, SolrRetrieveLatestDao retrieveLatestDao, AccessConfig config) { |
| this.client = client; |
| this.retrieveLatestDao = retrieveLatestDao; |
| this.config = config; |
| } |
| |
| @Override |
| public Document update(Document update, Optional<String> rawIndex) throws IOException { |
| Document newVersion = update; |
| // Handle any case where we're given comments in Map form, instead of raw String |
| Object commentsObj = update.getDocument().get(COMMENTS_FIELD); |
| if ( commentsObj instanceof List && |
| ((List<Object>) commentsObj).size() > 0 && |
| ((List<Object>) commentsObj).get(0) instanceof Map) { |
| newVersion = new Document(update); |
| convertCommentsToRaw(newVersion.getDocument()); |
| } |
| try { |
| SolrInputDocument solrInputDocument = SolrUtilities.toSolrInputDocument(newVersion); |
| Optional<String> index = SolrUtilities |
| .getIndex(config.getIndexSupplier(), newVersion.getSensorType(), rawIndex); |
| if (index.isPresent()) { |
| this.client.add(index.get(), solrInputDocument); |
| this.client.commit(index.get()); |
| } else { |
| throw new IllegalStateException("Index must be specified or inferred."); |
| } |
| } catch (SolrServerException e) { |
| throw new IOException(e); |
| } |
| return newVersion; |
| } |
| |
| @Override |
| public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException { |
| // updates with a collection specified |
| Map<String, Collection<SolrInputDocument>> solrCollectionUpdates = new HashMap<>(); |
| Set<String> collectionsUpdated = new HashSet<>(); |
| |
| for (Entry<Document, Optional<String>> entry : updates.entrySet()) { |
| SolrInputDocument solrInputDocument = SolrUtilities.toSolrInputDocument(entry.getKey()); |
| Optional<String> index = SolrUtilities |
| .getIndex(config.getIndexSupplier(), entry.getKey().getSensorType(), entry.getValue()); |
| if (index.isPresent()) { |
| Collection<SolrInputDocument> solrInputDocuments = solrCollectionUpdates |
| .getOrDefault(index.get(), new ArrayList<>()); |
| solrInputDocuments.add(solrInputDocument); |
| solrCollectionUpdates.put(index.get(), solrInputDocuments); |
| collectionsUpdated.add(index.get()); |
| } else { |
| String lookupIndex = config.getIndexSupplier().apply(entry.getKey().getSensorType()); |
| Collection<SolrInputDocument> solrInputDocuments = solrCollectionUpdates |
| .getOrDefault(lookupIndex, new ArrayList<>()); |
| solrInputDocuments.add(solrInputDocument); |
| solrCollectionUpdates.put(lookupIndex, solrInputDocuments); |
| collectionsUpdated.add(lookupIndex); |
| } |
| } |
| try { |
| for (Entry<String, Collection<SolrInputDocument>> entry : solrCollectionUpdates |
| .entrySet()) { |
| this.client.add(entry.getKey(), entry.getValue()); |
| } |
| for (String collection : collectionsUpdated) { |
| this.client.commit(collection); |
| } |
| } catch (SolrServerException e) { |
| throw new IOException(e); |
| } |
| return updates; |
| } |
| |
| @Override |
| public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException { |
| Document latest = retrieveLatestDao.getLatest(request.getGuid(), request.getSensorType()); |
| return addCommentToAlert(request, latest); |
| } |
| |
| @Override |
| public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException { |
| if (latest == null || latest.getDocument() == null) { |
| throw new IOException(String.format("Unable to add comment. Document with guid %s cannot be found.", |
| request.getGuid())); |
| } |
| |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> comments = (List<Map<String, Object>>) latest.getDocument() |
| .getOrDefault(COMMENTS_FIELD, new ArrayList<>()); |
| List<Map<String, Object>> originalComments = new ArrayList<>(comments); |
| |
| // Convert all comments back to raw JSON before updating. |
| List<String> commentStrs = new ArrayList<>(); |
| for (Map<String, Object> comment : originalComments) { |
| commentStrs.add(new AlertComment(comment).asJson()); |
| } |
| commentStrs.add(new AlertComment( |
| request.getComment(), |
| request.getUsername(), |
| request.getTimestamp() |
| ).asJson()); |
| |
| Document newVersion = new Document(latest); |
| newVersion.getDocument().put(COMMENTS_FIELD, commentStrs); |
| return update(newVersion, Optional.empty()); |
| } |
| |
| @Override |
| public Document removeCommentFromAlert(CommentAddRemoveRequest request) |
| throws IOException { |
| Document latest = retrieveLatestDao.getLatest(request.getGuid(), request.getSensorType()); |
| return removeCommentFromAlert(request, latest); |
| } |
| |
| @Override |
| public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) |
| throws IOException { |
| if (latest == null || latest.getDocument() == null) { |
| throw new IOException(String.format("Unable to remove comment. Document with guid %s cannot be found.", |
| request.getGuid())); |
| } |
| |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> commentMap = (List<Map<String, Object>>) latest.getDocument() |
| .get(COMMENTS_FIELD); |
| // Can't remove anything if there's nothing there |
| if (commentMap == null) { |
| throw new IOException(String.format("Unable to remove comment. Document with guid %s has no comments.", |
| request.getGuid())); |
| } |
| List<Map<String, Object>> originalComments = new ArrayList<>(commentMap); |
| List<AlertComment> comments = new ArrayList<>(); |
| for (Map<String, Object> commentStr : originalComments) { |
| comments.add(new AlertComment(commentStr)); |
| } |
| |
| comments.remove( |
| new AlertComment(request.getComment(), request.getUsername(), request.getTimestamp())); |
| List<String> commentsAsJson = comments.stream().map(AlertComment::asJson) |
| .collect(Collectors.toList()); |
| Document newVersion = new Document(latest); |
| newVersion.getDocument().put(COMMENTS_FIELD, commentsAsJson); |
| return update(newVersion, Optional.empty()); |
| } |
| |
| public void convertCommentsToRaw(Map<String,Object> source) { |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> comments = (List<Map<String, Object>>) source.get(COMMENTS_FIELD); |
| if (comments == null || comments.isEmpty()) { |
| return; |
| } |
| List<String> asJson = new ArrayList<>(); |
| for (Map<String, Object> comment : comments) { |
| asJson.add((new AlertComment(comment)).asJson()); |
| } |
| source.put(COMMENTS_FIELD, asJson); |
| } |
| } |