blob: 09f7df9ba7cbfc820bacc16f52ae11063f33c22f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.indexing.dao;
import com.google.common.base.Joiner;
import com.google.common.collect.Iterables;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.GroupResponse;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
import org.apache.metron.indexing.dao.update.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
public class MultiIndexDao implements IndexDao {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private List<IndexDao> indices;
public MultiIndexDao(IndexDao... composedDao) {
indices = new ArrayList<>();
Collections.addAll(indices, composedDao);
}
public MultiIndexDao(Iterable<IndexDao> composedDao) {
this.indices = new ArrayList<>();
Iterables.addAll(indices, composedDao);
}
public MultiIndexDao(Iterable<IndexDao> composedDao, Function<IndexDao, IndexDao> decoratorTransformation) {
this(Iterables.transform(composedDao, x -> decoratorTransformation.apply(x)));
}
@Override
public Document update(final Document update, Optional<String> index) throws IOException {
List<String> exceptions =
indices.parallelStream().map(dao -> {
try {
dao.update(update, index);
return null;
} catch (Throwable e) {
return dao.getClass() + ": " + e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e);
}
}).filter(e -> e != null).collect(Collectors.toList());
if(exceptions.size() > 0) {
throw new IOException(Joiner.on("\n").join(exceptions));
}
return update;
}
@Override
public Map<Document, Optional<String>> batchUpdate(Map<Document, Optional<String>> updates) throws IOException {
List<String> exceptions =
indices.parallelStream().map(dao -> {
try {
dao.batchUpdate(updates);
return null;
} catch (Throwable e) {
return dao.getClass() + ": " + e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e);
}
}).filter(e -> e != null).collect(Collectors.toList());
if (exceptions.size() > 0) {
throw new IOException(Joiner.on("\n").join(exceptions));
}
return updates;
}
@Override
public Map<String, FieldType> getColumnMetadata(List<String> in) throws IOException {
for(IndexDao dao : indices) {
Map<String, FieldType> r = dao.getColumnMetadata(in);
if(r != null) {
return r;
}
}
return null;
}
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request) throws IOException {
Document latest = getLatest(request.getGuid(), request.getSensorType());
return addCommentToAlert(request, latest);
}
/**
* Adds comments to an alert. Updates are written to each Dao in parallel with the assumption that all updates
* are identical. The first update to be applied is returned as the current version of the alert with comments added.
* @param request Request to add comments
* @param latest The latest version of the alert the comments will be added to.
* @return The complete alert document with comments added.
* @throws IOException
*/
@Override
public Document addCommentToAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
List<DocumentContainer> output = indices
.parallelStream()
.map(dao -> addCommentToAlert(dao, request, latest))
.collect(Collectors.toList());
return getLatestDocument(output);
}
private DocumentContainer addCommentToAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) {
DocumentContainer container;
try {
Document document = indexDao.addCommentToAlert(request, latest);
container = new DocumentContainer(document);
LOG.debug("Added comment to alert; indexDao={}, guid={}, sensorType={}, document={}",
ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document);
} catch (Throwable e) {
container = new DocumentContainer(e);
LOG.error("Unable to add comment to alert; indexDao={}, error={}",
ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
}
return container;
}
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request) throws IOException {
Document latest = getLatest(request.getGuid(), request.getSensorType());
return removeCommentFromAlert(request, latest);
}
/**
* Removes comments from an alert. Updates are written to each Dao in parallel with the assumption that all updates
* are identical. The first update to be applied is returned as the current version of the alert with comments removed.
* @param request Request to remove comments
* @param latest The latest version of the alert the comments will be removed from.
* @return The complete alert document with comments removed.
* @throws IOException
*/
@Override
public Document removeCommentFromAlert(CommentAddRemoveRequest request, Document latest) throws IOException {
List<DocumentContainer> output = indices
.parallelStream()
.map(dao -> removeCommentFromAlert(dao, request, latest))
.collect(Collectors.toList());
return getLatestDocument(output);
}
private DocumentContainer removeCommentFromAlert(IndexDao indexDao, CommentAddRemoveRequest request, Document latest) {
DocumentContainer container;
try {
Document document = indexDao.removeCommentFromAlert(request, latest);
container = new DocumentContainer(document);
LOG.debug("Removed comment from alert; indexDao={}, guid={}, sensorType={}, document={}",
ClassUtils.getShortClassName(indexDao.getClass()), document.getGuid(), document.getSensorType(), document);
} catch (Throwable e) {
container = new DocumentContainer(e);
LOG.error("Unable to remove comment from alert; indexDao={}, error={}",
ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
}
return container;
}
protected static class DocumentContainer {
private Optional<Document> d = Optional.empty();
private Optional<Throwable> t = Optional.empty();
public DocumentContainer(Document d) {
this.d = Optional.ofNullable(d);
}
public DocumentContainer(Throwable t) {
this.t = Optional.ofNullable(t);
}
public Optional<Document> getDocument() {
return d;
}
public Optional<Throwable> getException() {
return t;
}
}
private static class DocumentIterableContainer {
private Optional<Iterable<Document>> d = Optional.empty();
private Optional<Throwable> t = Optional.empty();
public DocumentIterableContainer(Iterable<Document> d) {
this.d = Optional.ofNullable(d);
}
public DocumentIterableContainer(Throwable t) {
this.t = Optional.ofNullable(t);
}
public Optional<Iterable<Document>> getDocumentIterable() {
return d;
}
public Optional<Throwable> getException() {
return t;
}
}
@Override
public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
for(IndexDao dao : indices) {
SearchResponse s = dao.search(searchRequest);
if(s != null) {
return s;
}
}
return null;
}
@Override
public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException {
for(IndexDao dao : indices) {
GroupResponse s = dao.group(groupRequest);
if(s != null) {
return s;
}
}
return null;
}
@Override
public void init(AccessConfig config) {
for(IndexDao dao : indices) {
dao.init(config);
}
}
@Override
public Document getLatest(final String guid, String sensorType) throws IOException {
List<DocumentContainer> output = indices
.parallelStream()
.map(dao -> getLatest(dao, guid, sensorType))
.collect(Collectors.toList());
return getLatestDocument(output);
}
private DocumentContainer getLatest(IndexDao indexDao, String guid, String sensorType) {
DocumentContainer container;
try {
Document document = indexDao.getLatest(guid, sensorType);
container = new DocumentContainer(document);
LOG.debug("Found latest document; indexDao={}, guid={}, sensorType={}, document={}",
ClassUtils.getShortClassName(indexDao.getClass()), guid, sensorType, document);
} catch (Throwable e) {
container = new DocumentContainer(e);
LOG.error("Unable to find latest document; indexDao={}, error={}",
ClassUtils.getShortClassName(indexDao.getClass()), ExceptionUtils.getRootCauseMessage(e));
}
return container;
}
@Override
public Iterable<Document> getAllLatest(
List<GetRequest> getRequests) throws IOException {
Iterable<Document> ret = null;
List<DocumentIterableContainer> output =
indices.parallelStream().map(dao -> {
try {
return new DocumentIterableContainer(dao.getAllLatest(getRequests));
} catch (Throwable e) {
return new DocumentIterableContainer(e);
}
}).collect(Collectors.toList());
List<String> error = new ArrayList<>();
for(DocumentIterableContainer dc : output) {
if(dc.getException().isPresent()) {
Throwable e = dc.getException().get();
error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e));
}
else {
if(dc.getDocumentIterable().isPresent()) {
Iterable<Document> documents = dc.getDocumentIterable().get();
if(ret == null) {
ret = documents;
}
}
}
}
if(error.size() > 0) {
throw new IOException(Joiner.on("\n").join(error));
}
return ret;
}
public List<IndexDao> getIndices() {
return indices;
}
/**
* Returns the most recent {@link Document} from a list of {@link DocumentContainer}s.
*
* @param documentContainers A list of containers; each retrieved from a separate index.
* @return The latest {@link Document} found.
* @throws IOException If any of the {@link DocumentContainer}s contain an exception.
*/
private Document getLatestDocument(List<DocumentContainer> documentContainers) throws IOException {
Document latestDocument = null;
List<String> error = new ArrayList<>();
for(DocumentContainer dc : documentContainers) {
if(dc.getException().isPresent()) {
// collect each exception; multiple can occur, one in each index
Throwable e = dc.getException().get();
error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e));
} else if(dc.getDocument().isPresent()) {
Document d = dc.getDocument().get();
// is this the latest document so far?
if(latestDocument == null || latestDocument.getTimestamp() < d.getTimestamp()) {
latestDocument = d;
}
} else {
// no document was found in the index
}
}
if(error.size() > 0) {
// report all of the errors encountered
throw new IOException(Joiner.on("\n").join(error));
}
return latestDocument;
}
}