blob: 4bc9f8a469f81d25a4b7a5a36765d6444b83e5aa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.solr.integration.components;
import com.google.common.base.Function;
import java.util.Collection;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.apache.metron.common.Constants;
import org.apache.metron.indexing.dao.metaalert.MetaAlertConstants;
import org.apache.metron.integration.InMemoryComponent;
import org.apache.metron.integration.UnableToStartException;
import org.apache.metron.solr.dao.SolrUtilities;
import org.apache.metron.solr.writer.MetronSolrClient;
import org.apache.solr.client.solrj.SolrQuery;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.embedded.JettyConfig;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.MiniSolrCloudCluster;
import org.apache.solr.common.SolrDocument;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.solr.common.SolrInputDocument;
import org.apache.zookeeper.KeeperException;
public class SolrComponent implements InMemoryComponent {
public static class Builder {
private int port = 8983;
private String solrXmlPath = "../metron-solr/src/test/resources/solr/solr.xml";
private Map<String, String> initialCollections = new HashMap<>();
private Function<SolrComponent, Void> postStartCallback;
public Builder withPort(int port) {
this.port = port;
return this;
}
public Builder withSolrXmlPath(String solrXmlPath) {
this.solrXmlPath = solrXmlPath;
return this;
}
public Builder addInitialCollection(String name, String configPath) {
initialCollections.put(name, configPath);
return this;
}
public Builder withPostStartCallback(Function<SolrComponent, Void> f) {
postStartCallback = f;
return this;
}
public SolrComponent build() {
return new SolrComponent(port, solrXmlPath, initialCollections, postStartCallback);
}
}
private int port;
private String solrXmlPath;
private Map<String, String> collections;
private MiniSolrCloudCluster miniSolrCloudCluster;
private Function<SolrComponent, Void> postStartCallback;
private SolrComponent(int port, String solrXmlPath, Map<String, String> collections,
Function<SolrComponent, Void> postStartCallback) {
this.port = port;
this.solrXmlPath = solrXmlPath;
this.collections = collections;
this.postStartCallback = postStartCallback;
}
@Override
public void start() throws UnableToStartException {
try {
File baseDir = Files.createTempDirectory("solrcomponent").toFile();
baseDir.deleteOnExit();
miniSolrCloudCluster = new MiniSolrCloudCluster(1, baseDir.toPath(),
JettyConfig.builder().setPort(port).build());
for(String name: collections.keySet()) {
String configPath = collections.get(name);
miniSolrCloudCluster.uploadConfigSet(new File(configPath).toPath(), name);
CollectionAdminRequest.createCollection(name, 1, 1).process(miniSolrCloudCluster.getSolrClient());
}
if (postStartCallback != null) {
postStartCallback.apply(this);
}
} catch (Exception e) {
throw new UnableToStartException(e.getMessage(), e);
}
}
@Override
public void stop() {
try {
miniSolrCloudCluster.deleteAllCollections();
miniSolrCloudCluster.shutdown();
} catch (Exception e) {
// Do nothing
}
}
@Override
public void reset() {
try {
miniSolrCloudCluster.deleteAllCollections();
} catch (Exception e) {
// Do nothing
}
}
public MetronSolrClient getSolrClient() {
return new MetronSolrClient(getZookeeperUrl());
}
public MiniSolrCloudCluster getMiniSolrCloudCluster() {
return this.miniSolrCloudCluster;
}
public String getZookeeperUrl() {
return miniSolrCloudCluster.getZkServer().getZkAddress();
}
public void addCollection(String name, String configPath)
throws InterruptedException, IOException, KeeperException, SolrServerException {
miniSolrCloudCluster.uploadConfigSet(new File(configPath).toPath(), name);
CollectionAdminRequest.createCollection(name, 1, 1)
.process(miniSolrCloudCluster.getSolrClient());
}
public boolean hasCollection(String collection) {
MetronSolrClient solr = getSolrClient();
boolean collectionFound = false;
try {
collectionFound = solr.listCollections().contains(collection);
} catch (Exception e) {
e.printStackTrace();
}
return collectionFound;
}
public List<Map<String, Object>> getAllIndexedDocs(String collection) {
List<Map<String, Object>> docs = new ArrayList<>();
CloudSolrClient solr = miniSolrCloudCluster.getSolrClient();
solr.setDefaultCollection(collection);
SolrQuery parameters = new SolrQuery();
// If it's metaalert, we need to adjust the query. We want child docs with the parent,
// not separate.
if (collection.equals("metaalert")) {
parameters.setQuery("source.type:metaalert")
.setFields("*", "[child parentFilter=source.type:metaalert limit=999]");
} else {
parameters.set("q", "*:*");
}
try {
solr.commit();
QueryResponse response = solr.query(parameters);
for (SolrDocument solrDocument : response.getResults()) {
// Use the utils to make sure we get child docs.
docs.add(SolrUtilities.toDocument(solrDocument).getDocument());
}
} catch (SolrServerException | IOException e) {
e.printStackTrace();
}
return docs;
}
public void addDocs(String collection, List<Map<String, Object>> docs)
throws IOException, SolrServerException {
CloudSolrClient solr = miniSolrCloudCluster.getSolrClient();
solr.setDefaultCollection(collection);
Collection<SolrInputDocument> solrInputDocuments = docs.stream().map(doc -> {
SolrInputDocument solrInputDocument = new SolrInputDocument();
for (Entry<String, Object> entry : doc.entrySet()) {
// If the entry itself is a map, add it as a child document. Handle one level of nesting.
if (entry.getValue() instanceof List && !entry.getKey().equals(
MetaAlertConstants.METAALERT_FIELD)) {
for (Object entryItem : (List)entry.getValue()) {
if (entryItem instanceof Map) {
@SuppressWarnings("unchecked")
Map<String, Object> childDoc = (Map<String, Object>) entryItem;
SolrInputDocument childInputDoc = new SolrInputDocument();
for (Entry<String, Object> childEntry : childDoc.entrySet()) {
childInputDoc.addField(childEntry.getKey(), childEntry.getValue());
}
solrInputDocument.addChildDocument(childInputDoc);
}
}
} else {
solrInputDocument.addField(entry.getKey(), entry.getValue());
}
}
return solrInputDocument;
}).collect(Collectors.toList());
checkUpdateResponse(solr.add(collection, solrInputDocuments));
// Make sure to commit so things show up
checkUpdateResponse(solr.commit(true, true));
}
protected void checkUpdateResponse(UpdateResponse result) throws IOException {
if (result.getStatus() != 0) {
throw new IOException("Response error received while adding documents: " + result);
}
}
}