blob: 54b5b64f338cb6e704b08b8caf4cfb20806d5f8d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.solr.dao;
import static org.apache.metron.indexing.dao.IndexDao.COMMENTS_FIELD;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.search.AlertComment;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.indexing.dao.update.UpdateDao;
import org.apache.solr.client.solrj.SolrClient;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.common.SolrInputDocument;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SolrUpdateDao implements UpdateDao {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private transient SolrClient client;
private AccessConfig config;
private transient SolrRetrieveLatestDao retrieveLatestDao;
public SolrUpdateDao(SolrClient client, SolrRetrieveLatestDao retrieveLatestDao, AccessConfig config) {
this.client = client;
this.retrieveLatestDao = retrieveLatestDao;
this.config = config;
}
@Override
public Document update(Document update, Optional<String> rawIndex) throws IOException {
Document newVersion = update;
// Handle any case where we're given comments in Map form, instead of raw String
Object commentsObj = update.getDocument().get(COMMENTS_FIELD);
if ( commentsObj instanceof List &&
((List<Object>) commentsObj).size() > 0 &&
((List<Object>) commentsObj).get(0) instanceof Map) {
newVersion = new Document(update);
convertCommentsToRaw(newVersion.getDocument());
}
try {
SolrInputDocument solrInputDocument = SolrUtilities.toSolrInputDocument(newVersion);
Optional<String> index = SolrUtilities
.getIndex(config.getIndexSupplier(), newVersion.getSensorType(), rawIndex);
if (index.isPresent()) {
this.client.add(index.get(), solrInputDocument);
this.client.commit(index.get());
} else {
throw new IllegalStateException("Index must be specified or inferred.");
}
} catch (SolrServerException e) {
throw new IOException(e);
}
return newVersion;
}
@Override
public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
// updates with a collection specified
Map<String, Collection<SolrInputDocument>> solrCollectionUpdates = new HashMap<>();
Set<String> collectionsUpdated = new HashSet<>();
for (Entry<Document, Optional<String>> entry : updates.entrySet()) {
SolrInputDocument solrInputDocument = SolrUtilities.toSolrInputDocument(entry.getKey());
Optional<String> index = SolrUtilities
.getIndex(config.getIndexSupplier(), entry.getKey().getSensorType(), entry.getValue());
if (index.isPresent()) {
Collection<SolrInputDocument> solrInputDocuments = solrCollectionUpdates
.getOrDefault(index.get(), new ArrayList<>());
solrInputDocuments.add(solrInputDocument);
solrCollectionUpdates.put(index.get(), solrInputDocuments);
collectionsUpdated.add(index.get());
} else {
String lookupIndex = config.getIndexSupplier().apply(entry.getKey().getSensorType());
Collection<SolrInputDocument> solrInputDocuments = solrCollectionUpdates
.getOrDefault(lookupIndex, new ArrayList<>());
solrInputDocuments.add(solrInputDocument);
solrCollectionUpdates.put(lookupIndex, solrInputDocuments);
collectionsUpdated.add(lookupIndex);
}
}
try {
for (Entry<String, Collection<SolrInputDocument>> entry : solrCollectionUpdates
.entrySet()) {
this.client.add(entry.getKey(), entry.getValue());
}
for (String collection : collectionsUpdated) {
this.client.commit(collection);
}
} catch (SolrServerException e) {
throw new IOException(e);
}
return updates;
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
Document latest = retrieveLatestDao.getLatest(request.getGuid(), request.getSensorType());
return addCommentToAlert(request, latest);
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
if (latest == null || latest.getDocument() == null) {
throw new IOException(String.format("Unable to add comment. Document with guid %s cannot be found.",
request.getGuid()));
}
@SuppressWarnings("unchecked")
List<Map<String, Object>> comments = (List<Map<String, Object>>) latest.getDocument()
.getOrDefault(COMMENTS_FIELD, new ArrayList<>());
List<Map<String, Object>> originalComments = new ArrayList<>(comments);
// Convert all comments back to raw JSON before updating.
List<String> commentStrs = new ArrayList<>();
for (Map<String, Object> comment : originalComments) {
commentStrs.add(new AlertComment(comment).asJson());
}
commentStrs.add(new AlertComment(
request.getComment(),
request.getUsername(),
request.getTimestamp()
).asJson());
Document newVersion = new Document(latest);
newVersion.getDocument().put(COMMENTS_FIELD, commentStrs);
return update(newVersion, Optional.empty());
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request)
throws IOException {
Document latest = retrieveLatestDao.getLatest(request.getGuid(), request.getSensorType());
return removeCommentFromAlert(request, latest);
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest)
throws IOException {
if (latest == null || latest.getDocument() == null) {
throw new IOException(String.format("Unable to remove comment. Document with guid %s cannot be found.",
request.getGuid()));
}
@SuppressWarnings("unchecked")
List<Map<String, Object>> commentMap = (List<Map<String, Object>>) latest.getDocument()
.get(COMMENTS_FIELD);
// Can't remove anything if there's nothing there
if (commentMap == null) {
throw new IOException(String.format("Unable to remove comment. Document with guid %s has no comments.",
request.getGuid()));
}
List<Map<String, Object>> originalComments = new ArrayList<>(commentMap);
List<AlertComment> comments = new ArrayList<>();
for (Map<String, Object> commentStr : originalComments) {
comments.add(new AlertComment(commentStr));
}
comments.remove(
new AlertComment(request.getComment(), request.getUsername(), request.getTimestamp()));
List<String> commentsAsJson = comments.stream().map(AlertComment::asJson)
.collect(Collectors.toList());
Document newVersion = new Document(latest);
newVersion.getDocument().put(COMMENTS_FIELD, commentsAsJson);
return update(newVersion, Optional.empty());
}
public void convertCommentsToRaw(Map<String,Object> source) {
@SuppressWarnings("unchecked")
List<Map<String, Object>> comments = (List<Map<String, Object>>) source.get(COMMENTS_FIELD);
if (comments == null || comments.isEmpty()) {
return;
}
List<String> asJson = new ArrayList<>();
for (Map<String, Object> comment : comments) {
asJson.add((new AlertComment(comment)).asJson());
}
source.put(COMMENTS_FIELD, asJson);
}
}