blob: 705beda5afc89878935810dd30e2bef9c8d33f55 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.elasticsearch.integration;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.elasticsearch.client.ElasticsearchClient;
import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.SearchIntegrationTest;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.integration.InMemoryComponent;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static String indexDir = "target/elasticsearch_search";
private static String dateFormat = "yyyy.MM.dd.HH";
private static String broTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template";
private static String snortTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template";
protected static final String BRO_INDEX = "bro_index_2017.01.01.01";
protected static final String SNORT_INDEX = "snort_index_2017.01.01.02";
protected static Map<String, Object> globalConfig;
protected static AccessConfig accessConfig;
protected static RestClient lowLevelClient;
protected static RestHighLevelClient highLevelClient;
protected static IndexDao dao;
@BeforeClass
public static void setup() throws Exception {
globalConfig = new HashMap<String, Object>() {{
put("es.clustername", "metron");
put("es.port", "9200");
put("es.ip", "localhost");
put("es.date.format", dateFormat);
}};
accessConfig = new AccessConfig();
accessConfig.setMaxSearchResults(100);
accessConfig.setMaxSearchGroups(100);
accessConfig.setGlobalConfigSupplier(() -> globalConfig);
indexComponent = startIndex();
ElasticsearchClient esClient = ElasticsearchClientFactory.create(globalConfig);
lowLevelClient = esClient.getLowLevelClient();
highLevelClient = esClient.getHighLevelClient();
dao = new ElasticsearchDao();
dao.init(accessConfig);
// The data is all static for searches, so we can set it up beforehand, and it's faster
loadTestData();
}
protected static InMemoryComponent startIndex() throws Exception {
InMemoryComponent es = new ElasticSearchComponent.Builder()
.withHttpPort(9211)
.withIndexDir(new File(indexDir))
.withAccessConfig(accessConfig)
.build();
es.start();
return es;
}
protected static void loadTestData() throws Exception {
ElasticSearchComponent es = (ElasticSearchComponent) indexComponent;
// add bro template
JSONObject broTemplate = JSONUtils.INSTANCE.load(new File(broTemplatePath), JSONObject.class);
addTestFieldMappings(broTemplate, "bro_doc");
String broTemplateJson = JSONUtils.INSTANCE.toJSON(broTemplate, true);
HttpEntity broEntity = new NStringEntity(broTemplateJson, ContentType.APPLICATION_JSON);
Response response = lowLevelClient.performRequest("PUT", "/_template/bro_template", Collections.emptyMap(), broEntity);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
// add snort template
JSONObject snortTemplate = JSONUtils.INSTANCE.load(new File(snortTemplatePath), JSONObject.class);
addTestFieldMappings(snortTemplate, "snort_doc");
String snortTemplateJson = JSONUtils.INSTANCE.toJSON(snortTemplate, true);
HttpEntity snortEntity = new NStringEntity(snortTemplateJson, ContentType.APPLICATION_JSON);
response = lowLevelClient.performRequest("PUT", "/_template/snort_template", Collections.emptyMap(), snortEntity);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
// create bro index
response = lowLevelClient.performRequest("PUT", BRO_INDEX);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
// create snort index
response = lowLevelClient.performRequest("PUT", SNORT_INDEX);
assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
// write the test documents for Bro
List<String> broDocuments = new ArrayList<>();
for (Object broObject: (JSONArray) new JSONParser().parse(broData)) {
broDocuments.add(((JSONObject) broObject).toJSONString());
}
// add documents using Metron GUID
es.add(BRO_INDEX, "bro", broDocuments.subList(0, 4), true);
// add a document to the same index but with an Elasticsearch id
es.add(BRO_INDEX, "bro", broDocuments.subList(4, 5), false);
// write the test documents for Snort
List<String> snortDocuments = new ArrayList<>();
for (Object snortObject: (JSONArray) new JSONParser().parse(snortData)) {
snortDocuments.add(((JSONObject) snortObject).toJSONString());
}
es.add(SNORT_INDEX, "snort", snortDocuments);
}
/**
* Add test fields to a template with defined types in case they are not defined in the sensor template shipped with Metron.
* This is useful for testing certain cases, for example faceting on fields of various types.
* Template follows this pattern:
* { "mappings" : { "xxx_doc" : { "properties" : { ... }}}}
* @param template - this method has side effects - template is modified with field mappings.
* @param docType
*/
private static void addTestFieldMappings(JSONObject template, String docType) {
Map mappings = (Map) template.get("mappings");
Map docTypeJSON = (Map) mappings.get(docType);
Map properties = (Map) docTypeJSON.get("properties");
Map<String, String> longType = new HashMap<>();
longType.put("type", "long");
properties.put("long_field", longType);
Map<String, String> floatType = new HashMap<>();
floatType.put("type", "float");
properties.put("latitude", floatType);
Map<String, String> doubleType = new HashMap<>();
doubleType.put("type", "double");
properties.put("score", doubleType);
}
@Test
public void bad_facet_query_throws_exception() throws Exception {
thrown.expect(InvalidSearchException.class);
thrown.expectMessage("Failed to execute search");
SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, SearchRequest.class);
dao.search(request);
}
@Override
public void returns_column_metadata_for_specified_indices() throws Exception {
// getColumnMetadata with only bro
{
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro"));
Assert.assertEquals(262, fieldTypes.size());
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method"));
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("ttl"));
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type"));
Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
Assert.assertEquals(FieldType.TEXT, fieldTypes.get("location_point"));
Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert"));
}
// getColumnMetadata with only snort
{
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort"));
Assert.assertEquals(32, fieldTypes.size());
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator"));
Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ttl"));
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type"));
Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
Assert.assertEquals(FieldType.TEXT, fieldTypes.get("location_point"));
Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ttl"));
Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert"));
}
}
@Override
public void returns_column_data_for_multiple_indices() throws Exception {
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort"));
Assert.assertEquals(277, fieldTypes.size());
// Ensure internal Metron fields are properly defined
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("source:type"));
Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score"));
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("alert_status"));
Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert"));
Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("suppress_for"));
Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
// Ensure a field defined only in bro is included
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method"));
// Ensure a field defined only in snort is included
Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator"));
// Ensure fields in both bro and snort have type OTHER because they have different types
Assert.assertEquals(FieldType.OTHER, fieldTypes.get("ttl"));
Assert.assertEquals(FieldType.OTHER, fieldTypes.get("msg"));
}
@Test
public void throws_exception_on_aggregation_queries_on_non_string_non_numeric_fields()
throws Exception {
thrown.expect(InvalidSearchException.class);
thrown.expectMessage("Failed to execute search");
GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, GroupRequest.class);
dao.group(request);
}
@Test
public void different_type_filter_query() throws Exception {
SearchRequest request = JSONUtils.INSTANCE.load(differentTypeFilterQuery, SearchRequest.class);
SearchResponse response = dao.search(request);
Assert.assertEquals(1, response.getTotal());
List<SearchResult> results = response.getResults();
Assert.assertEquals("bro", results.get(0).getSource().get("source:type"));
Assert.assertEquals("data 1", results.get(0).getSource().get("ttl"));
}
@Override
protected String getSourceTypeField() {
return Constants.SENSOR_TYPE.replace('.', ':');
}
@Override
protected IndexDao getIndexDao() {
return dao;
}
@Override
protected String getIndexName(String sensorType) {
if ("bro".equals(sensorType)) {
return BRO_INDEX;
} else {
return SNORT_INDEX;
}
}
}