METRON-1301 Alerts UI - Sorting on Triage Score Unexpectedly Filters Some Records (nickwallen) closes apache/metron#832
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
index 7db006e..3a68d75 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
@@ -98,7 +98,7 @@
"mapping": {
"type": "float"
},
- "match": "threat.triage.rules:*:score",
+ "match": "threat:triage:*score",
"match_mapping_type": "*"
}
},
@@ -107,7 +107,7 @@
"mapping": {
"type": "string"
},
- "match": "threat.triage.rules:*:reason",
+ "match": "threat:triage:rules:*:reason",
"match_mapping_type": "*"
}
},
@@ -116,9 +116,9 @@
"mapping": {
"type": "string"
},
- "match": "threat.triage.rules:*:name",
+ "match": "threat:triage:rules:*:name",
"match_mapping_type": "*"
- }
+ }
}
],
"properties": {
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
index f13a9ee..7c6b401 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
@@ -98,28 +98,28 @@
"mapping": {
"type": "float"
},
- "match": "threat.triage.rules:*:score",
+ "match": "threat:triage:*score",
"match_mapping_type": "*"
}
},
- {
- "threat_triage_reason": {
- "mapping": {
- "type": "string"
- },
- "match": "threat.triage.rules:*:reason",
- "match_mapping_type": "*"
- }
- },
- {
- "threat_triage_name": {
- "mapping": {
- "type": "string"
- },
- "match": "threat.triage.rules:*:name",
- "match_mapping_type": "*"
- }
+ {
+ "threat_triage_reason": {
+ "mapping": {
+ "type": "string"
+ },
+ "match": "threat:triage:rules:*:reason",
+ "match_mapping_type": "*"
}
+ },
+ {
+ "threat_triage_name": {
+ "mapping": {
+ "type": "string"
+ },
+ "match": "threat:triage:rules:*:name",
+ "match_mapping_type": "*"
+ }
+ }
],
"properties": {
"timestamp": {
@@ -195,9 +195,6 @@
"tcpwindow": {
"type": "string"
},
- "threat:triage:level": {
- "type": "double"
- },
"tos": {
"type": "integer"
},
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
index d84235d..d100eb0 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
@@ -98,7 +98,7 @@
"mapping": {
"type": "float"
},
- "match": "threat.triage.rules:*:score",
+ "match": "threat:triage:*score",
"match_mapping_type": "*"
}
},
@@ -107,7 +107,7 @@
"mapping": {
"type": "string"
},
- "match": "threat.triage.rules:*:reason",
+ "match": "threat:triage:rules:*:reason",
"match_mapping_type": "*"
}
},
@@ -116,7 +116,7 @@
"mapping": {
"type": "string"
},
- "match": "threat.triage.rules:*:name",
+ "match": "threat:triage:rules:*:name",
"match_mapping_type": "*"
}
}
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 4ce9644..25bb809 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -34,6 +34,10 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
@Configuration
public class IndexConfig {
@@ -57,6 +61,7 @@
int searchMaxGroups = environment.getProperty(MetronRestConstants.SEARCH_MAX_GROUPS, Integer.class, 1000);
String metaDaoImpl = environment.getProperty(MetronRestConstants.META_DAO_IMPL, String.class, null);
String metaDaoSort = environment.getProperty(MetronRestConstants.META_DAO_SORT, String.class, null);
+
AccessConfig config = new AccessConfig();
config.setMaxSearchResults(searchMaxResults);
config.setMaxSearchGroups(searchMaxGroups);
@@ -84,6 +89,7 @@
MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0);
ret.init(indexDao, Optional.ofNullable(metaDaoSort));
return ret;
+
}
catch(RuntimeException re) {
throw re;
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
index 3673654..78a1e20 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
@@ -89,8 +89,8 @@
public void setup() throws Exception {
this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
ImmutableMap<String, String> testData = ImmutableMap.of(
- "bro_index_2017.01.01.01", SearchIntegrationTest.broData,
- "snort_index_2017.01.01.01", SearchIntegrationTest.snortData
+ "bro_index_2017.01.01.01", SearchIntegrationTest.broData,
+ "snort_index_2017.01.01.01", SearchIntegrationTest.snortData
);
loadTestData(testData);
loadColumnTypes();
@@ -114,19 +114,19 @@
}});
assertEventually(() -> this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(defaultQuery))
- .andExpect(status().isOk())
- .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
- .andExpect(jsonPath("$.total").value(5))
- .andExpect(jsonPath("$.results[0].source.source:type").value("bro"))
- .andExpect(jsonPath("$.results[0].source.timestamp").value(5))
- .andExpect(jsonPath("$.results[1].source.source:type").value("bro"))
- .andExpect(jsonPath("$.results[1].source.timestamp").value(4))
- .andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
- .andExpect(jsonPath("$.results[2].source.timestamp").value(3))
- .andExpect(jsonPath("$.results[3].source.source:type").value("bro"))
- .andExpect(jsonPath("$.results[3].source.timestamp").value(2))
- .andExpect(jsonPath("$.results[4].source.source:type").value("bro"))
- .andExpect(jsonPath("$.results[4].source.timestamp").value(1))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+ .andExpect(jsonPath("$.total").value(5))
+ .andExpect(jsonPath("$.results[0].source.source:type").value("bro"))
+ .andExpect(jsonPath("$.results[0].source.timestamp").value(5))
+ .andExpect(jsonPath("$.results[1].source.source:type").value("bro"))
+ .andExpect(jsonPath("$.results[1].source.timestamp").value(4))
+ .andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
+ .andExpect(jsonPath("$.results[2].source.timestamp").value(3))
+ .andExpect(jsonPath("$.results[3].source.source:type").value("bro"))
+ .andExpect(jsonPath("$.results[3].source.timestamp").value(2))
+ .andExpect(jsonPath("$.results[4].source.source:type").value("bro"))
+ .andExpect(jsonPath("$.results[4].source.timestamp").value(1))
);
sensorIndexingConfigService.delete("bro");
@@ -288,4 +288,4 @@
columnTypes.put("snort", snortTypes);
InMemoryDao.setColumnMetadata(columnTypes);
}
-}
+}
\ No newline at end of file
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
new file mode 100644
index 0000000..0393629
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.FieldType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Responsible for retrieving column-level metadata about search indices.
+ */
+public interface ColumnMetadataDao {
+
+ /**
+ * Retrieves column metadata for one or more search indices.
+ * @param indices The search indices to retrieve column metadata for.
+ * @return The column metadata, one set for each search index.
+ * @throws IOException
+ */
+ Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException;
+
+ /**
+ * Finds the latest version of a set of base indices. This can be used to find
+ * the latest 'bro' index, for example.
+ *
+ * Assuming the following indices exist...
+ *
+ * [
+ * 'bro_index_2017.10.03.19'
+ * 'bro_index_2017.10.03.20',
+ * 'bro_index_2017.10.03.21',
+ * 'snort_index_2017.10.03.19',
+ * 'snort_index_2017.10.03.20',
+ * 'snort_index_2017.10.03.21'
+ * ]
+ *
+ * And the include indices are given as...
+ *
+ * ['bro', 'snort']
+ *
+ * Then the latest indices are...
+ *
+ * ['bro_index_2017.10.03.21', 'snort_index_2017.10.03.21']
+ *
+ * @param includeIndices The base names of the indices to include
+ * @return The latest version of a set of indices.
+ */
+ String[] getLatestIndices(List<String> includeIndices);
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
new file mode 100644
index 0000000..8e210b4
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+
+/**
+ * Responsible for retrieving column-level metadata for Elasticsearch search indices.
+ */
+public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private static Map<String, FieldType> elasticsearchTypeMap;
+ static {
+ Map<String, FieldType> fieldTypeMap = new HashMap<>();
+ fieldTypeMap.put("string", FieldType.STRING);
+ fieldTypeMap.put("ip", FieldType.IP);
+ fieldTypeMap.put("integer", FieldType.INTEGER);
+ fieldTypeMap.put("long", FieldType.LONG);
+ fieldTypeMap.put("date", FieldType.DATE);
+ fieldTypeMap.put("float", FieldType.FLOAT);
+ fieldTypeMap.put("double", FieldType.DOUBLE);
+ fieldTypeMap.put("boolean", FieldType.BOOLEAN);
+ elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
+ }
+
+ /**
+ * An Elasticsearch administrative client.
+ */
+ private transient AdminClient adminClient;
+
+ /**
+ * @param adminClient The Elasticsearch admin client.
+ */
+ public ElasticsearchColumnMetadataDao(AdminClient adminClient) {
+ this.adminClient = adminClient;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
+ Map<String, FieldType> indexColumnMetadata = new HashMap<>();
+ Map<String, String> previousIndices = new HashMap<>();
+ Set<String> fieldBlackList = new HashSet<>();
+
+ String[] latestIndices = getLatestIndices(indices);
+ if (latestIndices.length > 0) {
+ ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = adminClient
+ .indices()
+ .getMappings(new GetMappingsRequest().indices(latestIndices))
+ .actionGet()
+ .getMappings();
+
+ // for each index
+ for (Object key : mappings.keys().toArray()) {
+ String indexName = key.toString();
+ ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName);
+
+ // for each mapping in the index
+ Iterator<String> mappingIterator = mapping.keysIt();
+ while (mappingIterator.hasNext()) {
+ MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
+ Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData
+ .getSourceAsMap().get("properties");
+
+ // for each field in the mapping
+ for (String field : map.keySet()) {
+ if (!fieldBlackList.contains(field)) {
+ FieldType type = toFieldType(map.get(field).get("type"));
+
+ if(!indexColumnMetadata.containsKey(field)) {
+ indexColumnMetadata.put(field, type);
+
+ // record the last index in which a field exists, to be able to print helpful error message on type mismatch
+ previousIndices.put(field, indexName);
+
+ } else {
+ FieldType previousType = indexColumnMetadata.get(field);
+ if (!type.equals(previousType)) {
+ String previousIndexName = previousIndices.get(field);
+ LOG.error(String.format(
+ "Field type mismatch: %s.%s has type %s while %s.%s has type %s. Defaulting type to %s.",
+ indexName, field, type.getFieldType(),
+ previousIndexName, field, previousType.getFieldType(),
+ FieldType.OTHER.getFieldType()));
+ indexColumnMetadata.put(field, FieldType.OTHER);
+
+ // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER
+ fieldBlackList.add(field);
+ }
+ }
+ }
+ }
+ }
+ }
+ } else {
+ LOG.info(String.format("Unable to find any latest indices; indices=%s", indices));
+ }
+
+ return indexColumnMetadata;
+
+ }
+
+ /**
+ * Retrieves the latest indices.
+ * @param includeIndices
+ * @return
+ */
+ @Override
+ public String[] getLatestIndices(List<String> includeIndices) {
+ LOG.debug("Getting latest indices; indices={}", includeIndices);
+ Map<String, String> latestIndices = new HashMap<>();
+ String[] indices = adminClient
+ .indices()
+ .prepareGetIndex()
+ .setFeatures()
+ .get()
+ .getIndices();
+
+ for (String index : indices) {
+ int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
+ if (prefixEnd != -1) {
+ String prefix = index.substring(0, prefixEnd);
+ if (includeIndices.contains(prefix)) {
+ String latestIndex = latestIndices.get(prefix);
+ if (latestIndex == null || index.compareTo(latestIndex) > 0) {
+ latestIndices.put(prefix, index);
+ }
+ }
+ }
+ }
+
+ return latestIndices.values().toArray(new String[latestIndices.size()]);
+ }
+
+ /**
+ * Converts a string type to the corresponding FieldType.
+ * @param type The type to convert.
+ * @return The corresponding FieldType or FieldType.OTHER, if no match.
+ */
+ private FieldType toFieldType(String type) {
+ return elasticsearchTypeMap.getOrDefault(type, FieldType.OTHER);
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 87ad7f7..910c09b 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -17,26 +17,8 @@
*/
package org.apache.metron.elasticsearch.dao;
-import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
-
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
@@ -52,19 +34,16 @@
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortField;
import org.apache.metron.indexing.dao.search.SortOrder;
import org.apache.metron.indexing.dao.update.Document;
import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.mapper.ip.IpFieldMapper;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
@@ -80,18 +59,64 @@
import org.elasticsearch.search.aggregations.metrics.sum.Sum;
import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
public class ElasticsearchDao implements IndexDao {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * The value required to ensure that Elasticsearch sorts missing values last.
+ */
+ private static final String SORT_MISSING_LAST = "_last";
+
+ /**
+ * The value required to ensure that Elasticsearch sorts missing values last.
+ */
+ private static final String SORT_MISSING_FIRST = "_first";
+
+ /**
+ * The Elasticsearch client.
+ */
private transient TransportClient client;
+
+ /**
+ * Retrieves column metadata about search indices.
+ */
+ private ColumnMetadataDao columnMetadataDao;
+
+ /**
+ * Handles the submission of search requests to Elasticsearch.
+ */
+ private ElasticsearchRequestSubmitter requestSubmitter;
+
private AccessConfig accessConfig;
- protected ElasticsearchDao(TransportClient client, AccessConfig config) {
+ protected ElasticsearchDao(TransportClient client,
+ ColumnMetadataDao columnMetadataDao,
+ ElasticsearchRequestSubmitter requestSubmitter,
+ AccessConfig config) {
this.client = client;
+ this.columnMetadataDao = columnMetadataDao;
+ this.requestSubmitter = requestSubmitter;
this.accessConfig = config;
}
@@ -99,21 +124,6 @@
//uninitialized.
}
- private static Map<String, FieldType> elasticsearchSearchTypeMap;
-
- static {
- Map<String, FieldType> fieldTypeMap = new HashMap<>();
- fieldTypeMap.put("string", FieldType.STRING);
- fieldTypeMap.put("ip", FieldType.IP);
- fieldTypeMap.put("integer", FieldType.INTEGER);
- fieldTypeMap.put("long", FieldType.LONG);
- fieldTypeMap.put("date", FieldType.DATE);
- fieldTypeMap.put("float", FieldType.FLOAT);
- fieldTypeMap.put("double", FieldType.DOUBLE);
- fieldTypeMap.put("boolean", FieldType.BOOLEAN);
- elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
- }
-
@Override
public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery()));
@@ -121,56 +131,139 @@
/**
* Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query.
- * @param searchRequest The request defining the parameters of the search
+ * @param request The request defining the parameters of the search
* @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping
* @return The results of the query
* @throws InvalidSearchException When the query is malformed or the current state doesn't allow search
*/
- protected SearchResponse search(SearchRequest searchRequest, QueryBuilder queryBuilder) throws InvalidSearchException {
+ protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException {
+ org.elasticsearch.action.search.SearchRequest esRequest;
+ org.elasticsearch.action.search.SearchResponse esResponse;
+
if(client == null) {
throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use.");
}
- if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) {
+
+ if (request.getSize() > accessConfig.getMaxSearchResults()) {
throw new InvalidSearchException("Search result size must be less than " + accessConfig.getMaxSearchResults());
}
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+
+ esRequest = buildSearchRequest(request, queryBuilder);
+ esResponse = requestSubmitter.submitSearch(esRequest);
+ return buildSearchResponse(request, esResponse);
+ }
+
+ /**
+ * Builds an Elasticsearch search request.
+ * @param searchRequest The Metron search request.
+ * @param queryBuilder
+ * @return An Elasticsearch search request.
+ */
+ private org.elasticsearch.action.search.SearchRequest buildSearchRequest(
+ SearchRequest searchRequest,
+ QueryBuilder queryBuilder) throws InvalidSearchException {
+
+ LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
+ SearchSourceBuilder searchBuilder = new SearchSourceBuilder()
.size(searchRequest.getSize())
.from(searchRequest.getFrom())
.query(queryBuilder)
.trackScores(true);
- searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder())));
- Optional<List<String>> fields = searchRequest.getFields();
- if (fields.isPresent()) {
- searchSourceBuilder.fields(fields.get());
- } else {
- searchSourceBuilder.fetchSource(true);
- }
- Optional<List<String>> facetFields = searchRequest.getFacetFields();
- if (facetFields.isPresent()) {
- facetFields.get().forEach(field -> searchSourceBuilder.aggregation(new TermsBuilder(getFacentAggregationName(field)).field(field)));
- }
- String[] wildcardIndices = wildcardIndices(searchRequest.getIndices());
- org.elasticsearch.action.search.SearchResponse elasticsearchResponse;
+
+ // column metadata needed to understand the type of each sort field
+ Map<String, FieldType> meta;
try {
- elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
- .source(searchSourceBuilder)).actionGet();
- } catch (SearchPhaseExecutionException e) {
- LOG.error("Could not execute search", e);
- throw new InvalidSearchException("Could not execute search", e);
+ meta = getColumnMetadata(searchRequest.getIndices());
+ } catch(IOException e) {
+ throw new InvalidSearchException("Unable to get column metadata", e);
}
+
+ // handle sort fields
+ for(SortField sortField : searchRequest.getSort()) {
+
+ // what type is the sort field?
+ FieldType sortFieldType = meta.getOrDefault(sortField.getField(), FieldType.OTHER);
+
+ // sort order - if ascending missing values sorted last. otherwise, missing values sorted first
+ org.elasticsearch.search.sort.SortOrder sortOrder = getElasticsearchSortOrder(sortField.getSortOrder());
+ String missingSortOrder;
+ if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) {
+ missingSortOrder = SORT_MISSING_LAST;
+ } else {
+ missingSortOrder = SORT_MISSING_FIRST;
+ }
+
+ // sort by the field - missing fields always last
+ FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField())
+ .order(sortOrder)
+ .missing(missingSortOrder)
+ .unmappedType(sortFieldType.getFieldType());
+ searchBuilder.sort(sortBy);
+ }
+
+ // handle search fields
+ if (searchRequest.getFields().isPresent()) {
+ searchBuilder.fields(searchRequest.getFields().get());
+ } else {
+ searchBuilder.fetchSource(true);
+ }
+
+ // handle facet fields
+ if (searchRequest.getFacetFields().isPresent()) {
+ for(String field : searchRequest.getFacetFields().get()) {
+ String name = getFacentAggregationName(field);
+ TermsBuilder terms = new TermsBuilder(name).field(field);
+ searchBuilder.aggregation(terms);
+ }
+ }
+
+ // return the search request
+ String[] indices = wildcardIndices(searchRequest.getIndices());
+ LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString());
+ return new org.elasticsearch.action.search.SearchRequest()
+ .indices(indices)
+ .source(searchBuilder);
+ }
+
+ /**
+ * Builds a search response.
+ *
+ * This effectively transforms an Elasticsearch search response into a Metron search response.
+ *
+ * @param searchRequest The Metron search request.
+ * @param esResponse The Elasticsearch search response.
+ * @return A Metron search response.
+ * @throws InvalidSearchException
+ */
+ private SearchResponse buildSearchResponse(
+ SearchRequest searchRequest,
+ org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException {
+
SearchResponse searchResponse = new SearchResponse();
- searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits());
- searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit ->
- getSearchResult(searchHit, fields.isPresent())).collect(Collectors.toList()));
- if (facetFields.isPresent()) {
+ searchResponse.setTotal(esResponse.getHits().getTotalHits());
+
+ // search hits --> search results
+ List<SearchResult> results = new ArrayList<>();
+ for(SearchHit hit: esResponse.getHits().getHits()) {
+ results.add(getSearchResult(hit, searchRequest.getFields().isPresent()));
+ }
+ searchResponse.setResults(results);
+
+ // handle facet fields
+ if (searchRequest.getFacetFields().isPresent()) {
+ List<String> facetFields = searchRequest.getFacetFields().get();
Map<String, FieldType> commonColumnMetadata;
try {
commonColumnMetadata = getColumnMetadata(searchRequest.getIndices());
} catch (IOException e) {
- throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", Arrays.toString(searchRequest.getIndices().toArray())));
+ throw new InvalidSearchException(String.format(
+ "Could not get common column metadata for indices %s",
+ Arrays.toString(searchRequest.getIndices().toArray())));
}
- searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), elasticsearchResponse.getAggregations(), commonColumnMetadata ));
+ searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata ));
}
+
+ LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
return searchResponse;
}
@@ -188,42 +281,76 @@
*/
protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder)
throws InvalidSearchException {
+ org.elasticsearch.action.search.SearchRequest esRequest;
+ org.elasticsearch.action.search.SearchResponse esResponse;
+
if (client == null) {
throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use.");
}
if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) {
throw new InvalidSearchException("At least 1 group must be provided.");
}
- final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
- searchSourceBuilder.query(queryBuilder);
- searchSourceBuilder.aggregation(getGroupsTermBuilder(groupRequest, 0));
- String[] wildcardIndices = wildcardIndices(groupRequest.getIndices());
- org.elasticsearch.action.search.SearchRequest request;
- org.elasticsearch.action.search.SearchResponse response;
- try {
- request = new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
- .source(searchSourceBuilder);
- response = client.search(request).actionGet();
- } catch (SearchPhaseExecutionException e) {
- throw new InvalidSearchException("Could not execute search", e);
- }
+ esRequest = buildGroupRequest(groupRequest, queryBuilder);
+ esResponse = requestSubmitter.submitSearch(esRequest);
+ GroupResponse response = buildGroupResponse(groupRequest, esResponse);
+
+ return response;
+ }
+
+ /**
+ * Builds a group search request.
+ * @param groupRequest The Metron group request.
+ * @param queryBuilder The search query.
+ * @return An Elasticsearch search request.
+ */
+ private org.elasticsearch.action.search.SearchRequest buildGroupRequest(
+ GroupRequest groupRequest,
+ QueryBuilder queryBuilder) {
+
+ // handle groups
+ TermsBuilder groups = getGroupsTermBuilder(groupRequest, 0);
+ final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+ .query(queryBuilder)
+ .aggregation(groups);
+
+ // return the search request
+ String[] indices = wildcardIndices(groupRequest.getIndices());
+ return new org.elasticsearch.action.search.SearchRequest()
+ .indices(indices)
+ .source(searchSourceBuilder);
+ }
+
+ /**
+ * Build a group response.
+ * @param groupRequest The original group request.
+ * @param response The search response.
+ * @return A group response.
+ * @throws InvalidSearchException
+ */
+ private GroupResponse buildGroupResponse(
+ GroupRequest groupRequest,
+ org.elasticsearch.action.search.SearchResponse response) throws InvalidSearchException {
+
+ // build the search response
Map<String, FieldType> commonColumnMetadata;
try {
commonColumnMetadata = getColumnMetadata(groupRequest.getIndices());
} catch (IOException e) {
- throw new InvalidSearchException(String
- .format("Could not get common column metadata for indices %s",
+ throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s",
Arrays.toString(groupRequest.getIndices().toArray())));
}
+
GroupResponse groupResponse = new GroupResponse();
groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
- groupResponse.setGroupResults(
- getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata));
+ groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata));
return groupResponse;
}
private String[] wildcardIndices(List<String> indices) {
+ if(indices == null)
+ return new String[] {};
+
return indices
.stream()
.map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER))
@@ -235,26 +362,43 @@
if(this.client == null) {
this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings());
this.accessConfig = config;
+ this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin());
+ this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
+ }
+
+ if(columnMetadataDao == null) {
+ throw new IllegalArgumentException("No ColumnMetadataDao available");
+ }
+
+ if(requestSubmitter == null) {
+ throw new IllegalArgumentException("No ElasticsearchRequestSubmitter available");
}
}
@Override
public Document getLatest(final String guid, final String sensorType) throws IOException {
- Optional<Document> ret = searchByGuid(
- guid
- , sensorType
- , hit -> {
- Long ts = 0L;
- String doc = hit.getSourceAsString();
- String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
- try {
- return Optional.of(new Document(doc, guid, sourceType, ts));
- } catch (IOException e) {
- throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
- }
- }
- );
- return ret.orElse(null);
+ Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
+ return doc.orElse(null);
+ }
+
+ private Optional<Document> toDocument(final String guid, SearchHit hit) {
+ Long ts = 0L;
+ String doc = hit.getSourceAsString();
+ String sourceType = toSourceType(hit.getType());
+ try {
+ return Optional.of(new Document(doc, guid, sourceType, ts));
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * Returns the source type based on a given doc type.
+ * @param docType The document type.
+ * @return The source type.
+ */
+ private String toSourceType(String docType) {
+ return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
}
@Override
@@ -394,8 +538,7 @@
String type = sensorType + "_doc";
Object ts = update.getTimestamp();
IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
- .source(update.getDocument())
- ;
+ .source(update.getDocument());
if(ts != null) {
indexRequest = indexRequest.timestamp(ts.toString());
}
@@ -403,77 +546,9 @@
return indexRequest;
}
- @SuppressWarnings("unchecked")
@Override
public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
- Map<String, FieldType> indexColumnMetadata = new HashMap<>();
-
- // Keep track of the last index used to inspect a field type so we can print a helpful error message on type mismatch
- Map<String, String> previousIndices = new HashMap<>();
- // If we have detected a field type mismatch, ignore the field going forward since the type has been set to OTHER
- Set<String> fieldBlackList = new HashSet<>();
-
- String[] latestIndices = getLatestIndices(indices);
- if (latestIndices.length > 0) {
- ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = client
- .admin()
- .indices()
- .getMappings(new GetMappingsRequest().indices(latestIndices))
- .actionGet()
- .getMappings();
- for (Object key : mappings.keys().toArray()) {
- String indexName = key.toString();
- ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName);
- Iterator<String> mappingIterator = mapping.keysIt();
- while (mappingIterator.hasNext()) {
- MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
- Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData
- .getSourceAsMap().get("properties");
- for (String field : map.keySet()) {
- if (!fieldBlackList.contains(field)) {
- FieldType type = elasticsearchSearchTypeMap
- .getOrDefault(map.get(field).get("type"), FieldType.OTHER);
- if (indexColumnMetadata.containsKey(field)) {
- FieldType previousType = indexColumnMetadata.get(field);
- if (!type.equals(previousType)) {
- String previousIndexName = previousIndices.get(field);
- LOG.error(String.format(
- "Field type mismatch: %s.%s has type %s while %s.%s has type %s. Defaulting type to %s.",
- indexName, field, type.getFieldType(),
- previousIndexName, field, previousType.getFieldType(),
- FieldType.OTHER.getFieldType()));
- indexColumnMetadata.put(field, FieldType.OTHER);
- // Detected a type mismatch so ignore the field from now on
- fieldBlackList.add(field);
- }
- } else {
- indexColumnMetadata.put(field, type);
- previousIndices.put(field, indexName);
- }
- }
- }
- }
- }
- }
- return indexColumnMetadata;
- }
-
- protected String[] getLatestIndices(List<String> includeIndices) {
- Map<String, String> latestIndices = new HashMap<>();
- String[] indices = client.admin().indices().prepareGetIndex().setFeatures().get().getIndices();
- for (String index : indices) {
- int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
- if (prefixEnd != -1) {
- String prefix = index.substring(0, prefixEnd);
- if (includeIndices.contains(prefix)) {
- String latestIndex = latestIndices.get(prefix);
- if (latestIndex == null || index.compareTo(latestIndex) > 0) {
- latestIndices.put(prefix, index);
- }
- }
- }
- }
- return latestIndices.values().toArray(new String[latestIndices.size()]);
+ return columnMetadataDao.getColumnMetadata(indices);
}
private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder(
@@ -588,4 +663,19 @@
private String getSumAggregationName(String field) {
return String.format("%s_score", field);
}
+
+ public ElasticsearchDao client(TransportClient client) {
+ this.client = client;
+ return this;
+ }
+
+ public ElasticsearchDao columnMetadataDao(ColumnMetadataDao columnMetadataDao) {
+ this.columnMetadataDao = columnMetadataDao;
+ return this;
+ }
+
+ public ElasticsearchDao accessConfig(AccessConfig accessConfig) {
+ this.accessConfig = accessConfig;
+ return this;
+ }
}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
new file mode 100644
index 0000000..0e0df21
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Responsible for submitting requests to Elasticsearch.
+ */
+public class ElasticsearchRequestSubmitter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ /**
+ * The Elasticsearch client.
+ */
+ private TransportClient client;
+
+ public ElasticsearchRequestSubmitter(TransportClient client) {
+ this.client = client;
+ }
+
+ /**
+ * Submit a search to Elasticsearch.
+ * @param request A search request.
+ * @return The search response.
+ */
+ public SearchResponse submitSearch(SearchRequest request) throws InvalidSearchException {
+ LOG.debug("About to submit a search; request={}", ElasticsearchUtils.toJSON(request).orElse("???"));
+
+ // submit the search request
+ org.elasticsearch.action.search.SearchResponse esResponse;
+ try {
+ esResponse = client
+ .search(request)
+ .actionGet();
+ LOG.debug("Got Elasticsearch response; response={}", esResponse.toString());
+
+ } catch (SearchPhaseExecutionException e) {
+ String msg = String.format(
+ "Failed to execute search; error='%s', search='%s'",
+ ExceptionUtils.getRootCauseMessage(e),
+ ElasticsearchUtils.toJSON(request).orElse("???"));
+ LOG.error(msg, e);
+ throw new InvalidSearchException(msg, e);
+ }
+
+ // check for shard failures
+ if(esResponse.getFailedShards() > 0) {
+ handleShardFailures(request, esResponse);
+ }
+
+ // validate the response status
+ if(RestStatus.OK == esResponse.status()) {
+ return esResponse;
+
+ } else {
+ // the search was not successful
+ String msg = String.format(
+ "Bad search response; status=%s, timeout=%s, terminatedEarly=%s",
+ esResponse.status(), esResponse.isTimedOut(), esResponse.isTerminatedEarly());
+ LOG.error(msg);
+ throw new InvalidSearchException(msg);
+ }
+ }
+
+ /**
+ * Handle individual shard failures that can occur even when the response is OK. These
+ * can indicate misconfiguration of the search indices.
+ * @param request The search request.
+ * @param response The search response.
+ */
+ private void handleShardFailures(
+ org.elasticsearch.action.search.SearchRequest request,
+ org.elasticsearch.action.search.SearchResponse response) {
+ /*
+ * shard failures are only logged. the search itself is not failed. this approach
+ * assumes that a user is interested in partial search results, even if the
+ * entire search result set cannot be produced.
+ *
+ * for example, assume the user adds an additional sensor and the telemetry
+ * is indexed into a new search index. if that search index is misconfigured,
+ * it can result in partial shard failures. rather than failing the entire search,
+ * we log the error and allow the results to be returned from shards that
+ * are correctly configured.
+ */
+ int errors = ArrayUtils.getLength(response.getShardFailures());
+ LOG.error("Search resulted in {}/{} shards failing; errors={}, search={}",
+ response.getFailedShards(),
+ response.getTotalShards(),
+ errors,
+ ElasticsearchUtils.toJSON(request).orElse("???"));
+
+ // log each reported failure
+ int failureCount=1;
+ for(ShardSearchFailure fail: response.getShardFailures()) {
+ String msg = String.format(
+ "Shard search failure [%s/%s]; reason=%s, index=%s, shard=%s, status=%s, nodeId=%s",
+ failureCount,
+ errors,
+ ExceptionUtils.getRootCauseMessage(fail.getCause()),
+ fail.index(),
+ fail.shardId(),
+ fail.status(),
+ fail.shard().getNodeId());
+ LOG.error(msg, fail.getCause());
+ }
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 4c9933b..f29012a 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -22,10 +22,15 @@
import com.google.common.collect.Iterables;
import org.apache.commons.lang.StringUtils;
import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.lang.invoke.MethodHandles;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
@@ -35,11 +40,14 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static java.lang.String.format;
public class ElasticsearchUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
= ThreadLocal.withInitial(() -> new HashMap<>());
@@ -179,4 +187,48 @@
}
throw new IllegalStateException("Unable to read the elasticsearch ips, expected es.ip to be either a list of strings, a string hostname or a host:port string");
}
+
+ /**
+ * Converts an Elasticsearch SearchRequest to JSON.
+ * @param esRequest The search request.
+ * @return The JSON representation of the SearchRequest.
+ */
+ public static Optional<String> toJSON(org.elasticsearch.action.search.SearchRequest esRequest) {
+ Optional<String> json = Optional.empty();
+
+ if(esRequest != null) {
+ try {
+ json = Optional.of(XContentHelper.convertToJson(esRequest.source(), true));
+
+ } catch (Throwable t) {
+ LOG.error("Failed to convert search request to JSON", t);
+ }
+ }
+
+ return json;
+ }
+
+ /**
+ * Convert a SearchRequest to JSON.
+ * @param request The search request.
+ * @return The JSON representation of the SearchRequest.
+ */
+ public static Optional<String> toJSON(Object request) {
+ Optional<String> json = Optional.empty();
+
+ if(request != null) {
+ try {
+ json = Optional.of(
+ new ObjectMapper()
+ .writer()
+ .withDefaultPrettyPrinter()
+ .writeValueAsString(request));
+
+ } catch (Throwable t) {
+ LOG.error("Failed to convert request to JSON", t);
+ }
+ }
+
+ return json;
+ }
}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
new file mode 100644
index 0000000..0a83ee0
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the ElasticsearchColumnMetadata class.
+ */
+public class ElasticsearchColumnMetadataDaoTest {
+
+ /**
+ * @param indices The names of all indices that will exist.
+ * @return An object to test.
+ */
+ public ElasticsearchColumnMetadataDao setup(String[] indices) {
+ return setup(indices, ImmutableOpenMap.of());
+ }
+
+ /**
+ * @param indices The names of all indices that will exist.
+ * @param mappings The index mappings.
+ * @return An object to test.
+ */
+ public ElasticsearchColumnMetadataDao setup(
+ String[] indices,
+ ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings) {
+
+ AdminClient adminClient = mock(AdminClient.class);
+ IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
+ GetIndexRequestBuilder getIndexRequestBuilder = mock(GetIndexRequestBuilder.class);
+ GetIndexResponse getIndexResponse = mock(GetIndexResponse.class);
+ ActionFuture getMappingsActionFuture = mock(ActionFuture.class);
+ GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
+
+ // setup the mocks so that a set of indices are available to the DAO
+ when(adminClient.indices()).thenReturn(indicesAdminClient);
+ when(indicesAdminClient.prepareGetIndex()).thenReturn(getIndexRequestBuilder);
+ when(getIndexRequestBuilder.setFeatures()).thenReturn(getIndexRequestBuilder);
+ when(getIndexRequestBuilder.get()).thenReturn(getIndexResponse);
+ when(getIndexResponse.getIndices()).thenReturn(indices);
+
+ // setup the mocks so that a set of mappings are available to the DAO
+ when(indicesAdminClient.getMappings(any())).thenReturn(getMappingsActionFuture);
+ when(getMappingsActionFuture.actionGet()).thenReturn(getMappingsResponse);
+ when(getMappingsResponse.getMappings()).thenReturn(mappings);
+
+ return new ElasticsearchColumnMetadataDao(adminClient);
+ }
+
+ @Test
+ public void testGetOneLatestIndex() {
+
+ // setup
+ String[] existingIndices = new String[] {
+ "bro_index_2017.10.03.19",
+ "bro_index_2017.10.03.20",
+ "bro_index_2017.10.03.21",
+ "snort_index_2017.10.03.19",
+ "snort_index_2017.10.03.20",
+ "snort_index_2017.10.03.21"
+ };
+ ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+ // get the latest indices
+ List<String> args = Collections.singletonList("bro");
+ String[] actual = dao.getLatestIndices(args);
+
+ // validation
+ String [] expected = new String[] { "bro_index_2017.10.03.21" };
+ assertArrayEquals(expected, actual);
+ }
+
+ @Test
+ public void testGetLatestIndices() {
+ // setup
+ String[] existingIndices = new String[] {
+ "bro_index_2017.10.03.19",
+ "bro_index_2017.10.03.20",
+ "bro_index_2017.10.03.21",
+ "snort_index_2017.10.03.19",
+ "snort_index_2017.10.03.19",
+ "snort_index_2017.10.03.21"
+ };
+ ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+ // get the latest indices
+ List<String> args = Arrays.asList("bro", "snort");
+ String[] actual = dao.getLatestIndices(args);
+
+ // validation
+ String [] expected = new String[] { "bro_index_2017.10.03.21", "snort_index_2017.10.03.21" };
+ assertArrayEquals(expected, actual);
+ }
+
+ @Test
+ public void testLatestIndicesWhereNoneExist() {
+
+ // setup - there are no existing indices
+ String[] existingIndices = new String[] {};
+ ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+ // get the latest indices
+ List<String> args = Arrays.asList("bro", "snort");
+ String[] actual = dao.getLatestIndices(args);
+
+ // validation
+ String [] expected = new String[] {};
+ assertArrayEquals(expected, actual);
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 7c33018..a6c0aa6 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -17,98 +17,209 @@
*/
package org.apache.metron.elasticsearch.dao;
-import org.apache.metron.elasticsearch.matcher.SearchRequestMatcher;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.search.*;
-import org.elasticsearch.action.ActionFuture;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SortField;
+import org.apache.metron.indexing.dao.search.SortOrder;
import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
-import org.junit.Assert;
-import org.junit.Before;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
import org.junit.Test;
-import org.mockito.Mock;
+import org.mockito.ArgumentCaptor;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class ElasticsearchDaoTest {
- private IndexDao searchService;
+ private ElasticsearchDao dao;
+ private ElasticsearchRequestSubmitter requestSubmitter;
- @Mock
- TransportClient client;
+ private void setup(RestStatus status, int maxSearchResults, Map<String, FieldType> metadata) throws Exception {
- @Before
- public void setUp() throws Exception {
- client = mock(TransportClient.class);
+ // setup the mock search hits
+ SearchHit hit1 = mock(SearchHit.class);
+ when(hit1.getId()).thenReturn("id1");
+ when(hit1.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value1"); }});
+ when(hit1.getScore()).thenReturn(0.1f);
+
+ SearchHit hit2 = mock(SearchHit.class);
+ when(hit2.getId()).thenReturn("id2");
+ when(hit2.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value2"); }});
+ when(hit2.getScore()).thenReturn(0.2f);
+
+ // search hits
+ SearchHit[] hits = { hit1, hit2 };
+ SearchHits searchHits = mock(SearchHits.class);
+ when(searchHits.getHits()).thenReturn(hits);
+ when(searchHits.getTotalHits()).thenReturn(Integer.toUnsignedLong(hits.length));
+
+ // search response which returns the search hits
+ org.elasticsearch.action.search.SearchResponse response = mock(org.elasticsearch.action.search.SearchResponse.class);
+ when(response.status()).thenReturn(status);
+ when(response.getHits()).thenReturn(searchHits);
+
+ // provides column metadata
+ ColumnMetadataDao columnMetadataDao = mock(ColumnMetadataDao.class);
+ when(columnMetadataDao.getColumnMetadata(any())).thenReturn(metadata);
+
+ // returns the search response
+ requestSubmitter = mock(ElasticsearchRequestSubmitter.class);
+ when(requestSubmitter.submitSearch(any())).thenReturn(response);
+
+ TransportClient client = mock(TransportClient.class);
+
+ // provides configuration
AccessConfig config = mock(AccessConfig.class);
- when(config.getMaxSearchResults()).thenReturn(50);
- searchService = new ElasticsearchDao(client, config);
+ when(config.getMaxSearchResults()).thenReturn(maxSearchResults);
+
+ dao = new ElasticsearchDao(client, columnMetadataDao, requestSubmitter, config);
+ }
+
+ private void setup(RestStatus status, int maxSearchResults) throws Exception {
+ setup(status, maxSearchResults, new HashMap<>());
}
@Test
- public void searchShouldProperlyBuildSearchRequest() throws Exception {
+ public void searchShouldSortByGivenFields() throws Exception {
- // setup the mock client
- SearchHit searchHit1 = mock(SearchHit.class);
- when(searchHit1.getId()).thenReturn("id1");
- when(searchHit1.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value1"); }});
- when(searchHit1.getScore()).thenReturn(0.1f);
+ // setup the column metadata
+ Map<String, FieldType> columnMetadata = new HashMap<>();
+ columnMetadata.put("sortByStringDesc", FieldType.STRING);
+ columnMetadata.put("sortByIntAsc", FieldType.INTEGER);
- SearchHit searchHit2 = mock(SearchHit.class);
- when(searchHit2.getId()).thenReturn("id2");
- when(searchHit2.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value2"); }});
- when(searchHit2.getScore()).thenReturn(0.2f);
-
- SearchHits searchHits = mock(SearchHits.class);
- when(searchHits.getHits()).thenReturn(new SearchHit[]{searchHit1, searchHit2});
- when(searchHits.getTotalHits()).thenReturn(2L);
-
- org.elasticsearch.action.search.SearchResponse elasticsearchResponse = mock(org.elasticsearch.action.search.SearchResponse.class);
- when(elasticsearchResponse.getHits()).thenReturn(searchHits);
-
- ActionFuture actionFuture = mock(ActionFuture.class);
- when(actionFuture.actionGet()).thenReturn(elasticsearchResponse);
- when(client.search(any())).thenReturn(actionFuture);
+ // setup the dao
+ setup(RestStatus.OK, 25, columnMetadata);
// "sort by" fields for the search request
- SortField[] sortFields = {
- sortBy("sortField1", SortOrder.DESC),
- sortBy("sortField2", SortOrder.ASC)
+ SortField[] expectedSortFields = {
+ sortBy("sortByStringDesc", SortOrder.DESC),
+ sortBy("sortByIntAsc", SortOrder.ASC),
+ sortBy("sortByUndefinedDesc", SortOrder.DESC)
};
- // create a search request
+ // create a metron search request
+ final List<String> indices = Arrays.asList("bro", "snort");
SearchRequest searchRequest = new SearchRequest();
searchRequest.setSize(2);
- searchRequest.setIndices(Arrays.asList("bro", "snort"));
+ searchRequest.setIndices(indices);
searchRequest.setFrom(5);
- searchRequest.setSort(Arrays.asList(sortFields));
+ searchRequest.setSort(Arrays.asList(expectedSortFields));
searchRequest.setQuery("some query");
- // submit the search request
- SearchResponse searchResponse = searchService.search(searchRequest);
+ // submit the metron search request
+ SearchResponse searchResponse = dao.search(searchRequest);
+ assertNotNull(searchResponse);
- // validate
- String[] expectedIndices = {"bro_index*", "snort_index*"};
- verify(client).search(argThat(new SearchRequestMatcher(expectedIndices, "some query", 2, 5, sortFields)));
- assertEquals(2, searchResponse.getTotal());
- List<SearchResult> actualSearchResults = searchResponse.getResults();
- assertEquals(2, actualSearchResults.size());
- assertEquals("id1", actualSearchResults.get(0).getId());
- assertEquals("value1", actualSearchResults.get(0).getSource().get("field"));
- assertEquals(0.1f, actualSearchResults.get(0).getScore(), 0.0f);
- assertEquals("id2", actualSearchResults.get(1).getId());
- assertEquals("value2", actualSearchResults.get(1).getSource().get("field"));
- assertEquals(0.2f, actualSearchResults.get(1).getScore(), 0.0f);
- verifyNoMoreInteractions(client);
+ // capture the elasticsearch search request that was created
+ ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class);
+ verify(requestSubmitter).submitSearch(argument.capture());
+ org.elasticsearch.action.search.SearchRequest request = argument.getValue();
+
+ // transform the request to JSON for validation
+ JSONParser parser = new JSONParser();
+ JSONObject json = (JSONObject) parser.parse(ElasticsearchUtils.toJSON(request).orElse("???"));
+
+ // validate the sort fields
+ JSONArray sortFields = (JSONArray) json.get("sort");
+ assertEquals(3, sortFields.size());
+
+ {
+ // sort by string descending
+ JSONObject aSortField = (JSONObject) sortFields.get(0);
+ JSONObject sortBy = (JSONObject) aSortField.get("sortByStringDesc");
+ assertEquals("desc", sortBy.get("order"));
+ assertEquals("_last", sortBy.get("missing"));
+ assertEquals("string", sortBy.get("unmapped_type"));
+ }
+ {
+ // sort by integer ascending
+ JSONObject aSortField = (JSONObject) sortFields.get(1);
+ JSONObject sortByIntAsc = (JSONObject) aSortField.get("sortByIntAsc");
+ assertEquals("asc", sortByIntAsc.get("order"));
+ assertEquals("_first", sortByIntAsc.get("missing"));
+ assertEquals("integer", sortByIntAsc.get("unmapped_type"));
+ }
+ {
+ // sort by unknown type
+ JSONObject aSortField = (JSONObject) sortFields.get(2);
+ JSONObject sortByUndefinedDesc = (JSONObject) aSortField.get("sortByUndefinedDesc");
+ assertEquals("desc", sortByUndefinedDesc.get("order"));
+ assertEquals("_last", sortByUndefinedDesc.get("missing"));
+ assertEquals("other", sortByUndefinedDesc.get("unmapped_type"));
+ }
+ }
+
+ @Test
+ public void searchShouldWildcardIndices() throws Exception {
+
+ // setup the dao
+ setup(RestStatus.OK, 25);
+
+ // "sort by" fields for the search request
+ SortField[] expectedSortFields = {
+ sortBy("sortByStringDesc", SortOrder.DESC),
+ sortBy("sortByIntAsc", SortOrder.ASC),
+ sortBy("sortByUndefinedDesc", SortOrder.DESC)
+ };
+
+ // create a metron search request
+ final List<String> indices = Arrays.asList("bro", "snort");
+ SearchRequest searchRequest = new SearchRequest();
+ searchRequest.setSize(2);
+ searchRequest.setIndices(indices);
+ searchRequest.setFrom(5);
+ searchRequest.setSort(Arrays.asList(expectedSortFields));
+ searchRequest.setQuery("some query");
+
+ // submit the metron search request
+ SearchResponse searchResponse = dao.search(searchRequest);
+ assertNotNull(searchResponse);
+
+ // capture the elasticsearch search request that was created
+ ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class);
+ verify(requestSubmitter).submitSearch(argument.capture());
+ org.elasticsearch.action.search.SearchRequest request = argument.getValue();
+
+ // transform the request to JSON for validation
+ JSONParser parser = new JSONParser();
+ JSONObject json = (JSONObject) parser.parse(ElasticsearchUtils.toJSON(request).orElse("???"));
+
+ // ensure that the index names are 'wildcard-ed'
+ String[] expected = { "bro_index*", "snort_index*" };
+ assertArrayEquals(expected, request.indices());
+ }
+
+
+ @Test(expected = InvalidSearchException.class)
+ public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws Exception {
+
+ int maxSearchResults = 20;
+ setup(RestStatus.OK, maxSearchResults);
+
+ SearchRequest searchRequest = new SearchRequest();
+ searchRequest.setSize(maxSearchResults+1);
+
+ dao.search(searchRequest);
+ // exception expected - size > max
}
private SortField sortBy(String field, SortOrder order) {
@@ -118,19 +229,4 @@
return sortField;
}
- @Test
- public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws Exception {
- SearchRequest searchRequest = new SearchRequest();
- searchRequest.setSize(51);
- try {
- searchService.search(searchRequest);
- Assert.fail("Did not throw expected exception");
- }
- catch(InvalidSearchException ise) {
- Assert.assertEquals("Search result size must be less than 50", ise.getMessage());
- }
- }
-
-
-
}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
new file mode 100644
index 0000000..26f5fff
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchShardTarget;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchRequestSubmitterTest {
+
+ private ElasticsearchRequestSubmitter submitter;
+
+ public ElasticsearchRequestSubmitter setup(SearchResponse response) {
+
+ // mocks
+ TransportClient client = mock(TransportClient.class);
+ ActionFuture future = Mockito.mock(ActionFuture.class);
+
+ // the client should return the given search response
+ when(client.search(any())).thenReturn(future);
+ when(future.actionGet()).thenReturn(response);
+
+ return new ElasticsearchRequestSubmitter(client);
+ }
+
+ @Test
+ public void searchShouldSucceedWhenOK() throws InvalidSearchException {
+
+ // mocks
+ SearchResponse response = mock(SearchResponse.class);
+ SearchRequest request = mock(SearchRequest.class);
+
+ // response will have status of OK and no failed shards
+ when(response.status()).thenReturn(RestStatus.OK);
+ when(response.getFailedShards()).thenReturn(0);
+ when(response.getTotalShards()).thenReturn(2);
+
+ // search should succeed
+ ElasticsearchRequestSubmitter submitter = setup(response);
+ SearchResponse actual = submitter.submitSearch(request);
+ assertNotNull(actual);
+ }
+
+ @Test(expected = InvalidSearchException.class)
+ public void searchShouldFailWhenNotOK() throws InvalidSearchException {
+
+ // mocks
+ SearchResponse response = mock(SearchResponse.class);
+ SearchRequest request = mock(SearchRequest.class);
+
+ // response will have status of OK
+ when(response.status()).thenReturn(RestStatus.PARTIAL_CONTENT);
+ when(response.getFailedShards()).thenReturn(0);
+ when(response.getTotalShards()).thenReturn(2);
+
+ // search should succeed
+ ElasticsearchRequestSubmitter submitter = setup(response);
+ submitter.submitSearch(request);
+ }
+
+ @Test
+ public void searchShouldHandleShardFailure() throws InvalidSearchException {
+ // mocks
+ SearchResponse response = mock(SearchResponse.class);
+ SearchRequest request = mock(SearchRequest.class);
+ ShardSearchFailure fail = mock(ShardSearchFailure.class);
+ SearchShardTarget target = mock(SearchShardTarget.class);
+
+ // response will have status of OK
+ when(response.status()).thenReturn(RestStatus.OK);
+
+ // the response will report shard failures
+ when(response.getFailedShards()).thenReturn(1);
+ when(response.getTotalShards()).thenReturn(2);
+
+ // the response will return the failures
+ ShardSearchFailure[] failures = { fail };
+ when(response.getShardFailures()).thenReturn(failures);
+
+ // shard failure needs to report the node
+ when(fail.shard()).thenReturn(target);
+ when(target.getNodeId()).thenReturn("node1");
+
+ // shard failure needs to report details of failure
+ when(fail.index()).thenReturn("bro_index_2017-10-11");
+ when(fail.shardId()).thenReturn(1);
+
+ // search should succeed, even with failed shards
+ ElasticsearchRequestSubmitter submitter = setup(response);
+ SearchResponse actual = submitter.submitSearch(request);
+ assertNotNull(actual);
+ }
+}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 07cc708..3d50e99 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -17,18 +17,11 @@
*/
package org.apache.metron.elasticsearch.integration;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.concurrent.ExecutionException;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao;
import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
import org.apache.metron.indexing.dao.AccessConfig;
import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
import org.apache.metron.indexing.dao.SearchIntegrationTest;
import org.apache.metron.integration.InMemoryComponent;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -43,7 +36,13 @@
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+
public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
+
private static String indexDir = "target/elasticsearch_search";
private static String dateFormat = "yyyy.MM.dd.HH";
private static final int MAX_RETRIES = 10;
@@ -53,19 +52,46 @@
* {
* "bro_doc": {
* "properties": {
- * "source:type": { "type": "string" },
- * "ip_src_addr": { "type": "ip" },
- * "ip_src_port": { "type": "integer" },
- * "long_field": { "type": "long" },
- * "timestamp" : { "type": "date" },
- * "latitude" : { "type": "float" },
- * "score": { "type": "double" },
- * "is_alert": { "type": "boolean" },
- * "location_point": { "type": "geo_point" },
- * "bro_field": { "type": "string" },
- * "duplicate_name_field": { "type": "string" }
+ * "source:type": {
+ * "type": "string",
+ * "index": "not_analyzed"
+ * },
+ * "ip_src_addr": {
+ * "type": "ip"
+ * },
+ * "ip_src_port": {
+ * "type": "integer"
+ * },
+ * "long_field": {
+ * "type": "long"
+ * },
+ * "timestamp": {
+ * "type": "date",
+ * "format": "epoch_millis"
+ * },
+ * "latitude" : {
+ * "type": "float"
+ * },
+ * "score": {
+ * "type": "double"
+ * },
+ * "is_alert": {
+ * "type": "boolean"
+ * },
+ * "location_point": {
+ * "type": "geo_point"
+ * },
+ * "bro_field": {
+ * "type": "string"
+ * },
+ * "duplicate_name_field": {
+ * "type": "string"
+ * },
+ * "alert": {
+ * "type": "nested"
+ * }
* }
- * }
+ * }
* }
*/
@Multiline
@@ -73,21 +99,51 @@
/**
* {
- * "snort_doc": {
- * "properties": {
- * "source:type": { "type": "string" },
- * "ip_src_addr": { "type": "ip" },
- * "ip_src_port": { "type": "integer" },
- * "long_field": { "type": "long" },
- * "timestamp" : { "type": "date" },
- * "latitude" : { "type": "float" },
- * "score": { "type": "double" },
- * "is_alert": { "type": "boolean" },
- * "location_point": { "type": "geo_point" },
- * "snort_field": { "type": "integer" },
- * "duplicate_name_field": { "type": "integer" }
- * }
- * }
+ * "snort_doc": {
+ * "properties": {
+ * "source:type": {
+ * "type": "string",
+ * "index": "not_analyzed"
+ * },
+ * "ip_src_addr": {
+ * "type": "ip"
+ * },
+ * "ip_src_port": {
+ * "type": "integer"
+ * },
+ * "long_field": {
+ * "type": "long"
+ * },
+ * "timestamp": {
+ * "type": "date",
+ * "format": "epoch_millis"
+ * },
+ * "latitude" : {
+ * "type": "float"
+ * },
+ * "score": {
+ * "type": "double"
+ * },
+ * "is_alert": {
+ * "type": "boolean"
+ * },
+ * "location_point": {
+ * "type": "geo_point"
+ * },
+ * "snort_field": {
+ * "type": "integer"
+ * },
+ * "duplicate_name_field": {
+ * "type": "integer"
+ * },
+ * "alert": {
+ * "type": "nested"
+ * },
+ * "threat:triage:score": {
+ * "type": "float"
+ * }
+ * }
+ * }
* }
*/
@Multiline
@@ -106,27 +162,23 @@
@Multiline
private static String metaAlertTypeMappings;
-
@Override
protected IndexDao createDao() throws Exception {
- IndexDao elasticsearchDao = new ElasticsearchDao();
- elasticsearchDao.init(
- new AccessConfig() {{
- setMaxSearchResults(100);
- setMaxSearchGroups(100);
- setGlobalConfigSupplier( () ->
- new HashMap<String, Object>() {{
- put("es.clustername", "metron");
- put("es.port", "9300");
- put("es.ip", "localhost");
- put("es.date.format", dateFormat);
- }}
- );
+ AccessConfig config = new AccessConfig();
+ config.setMaxSearchResults(100);
+ config.setMaxSearchGroups(100);
+ config.setGlobalConfigSupplier( () ->
+ new HashMap<String, Object>() {{
+ put("es.clustername", "metron");
+ put("es.port", "9300");
+ put("es.ip", "localhost");
+ put("es.date.format", dateFormat);
}}
);
- MetaAlertDao ret = new ElasticsearchMetaAlertDao();
- ret.init(elasticsearchDao);
- return elasticsearchDao;
+
+ IndexDao dao = new ElasticsearchDao();
+ dao.init(config);
+ return dao;
}
@Override
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
deleted file mode 100644
index 417e48b..0000000
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.elasticsearch.matcher;
-
-import org.apache.metron.indexing.dao.search.SortField;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.FieldSortBuilder;
-import org.elasticsearch.search.sort.SortOrder;
-import org.hamcrest.Description;
-import org.mockito.ArgumentMatcher;
-
-import java.util.Arrays;
-
-public class SearchRequestMatcher extends ArgumentMatcher<SearchRequest> {
-
- private String[] expectedIndices;
- private String[] actualIndices;
-
- private BytesReference expectedSource;
- private BytesReference actualSource;
-
- private boolean indicesMatch;
- private boolean sourcesMatch;
-
- public SearchRequestMatcher(String[] indices, String query, int size, int from, SortField[] sortFields) {
- expectedIndices = indices;
- SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
- .size(size)
- .from(from)
- .query(new QueryStringQueryBuilder(query))
- .fetchSource(true)
- .trackScores(true);
- for(SortField sortField: sortFields) {
- FieldSortBuilder fieldSortBuilder = new FieldSortBuilder(sortField.getField());
- fieldSortBuilder.order(sortField.getSortOrder() == org.apache.metron.indexing.dao.search.SortOrder.DESC ? SortOrder.DESC : SortOrder.ASC);
- searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder);
- }
- expectedSource = searchSourceBuilder.buildAsBytes(Requests.CONTENT_TYPE);
- }
-
- @Override
- public boolean matches(Object o) {
- SearchRequest searchRequest = (SearchRequest) o;
-
- actualIndices = searchRequest.indices();
- actualSource = searchRequest.source();
-
- indicesMatch = Arrays.equals(expectedIndices, actualIndices);
- sourcesMatch = expectedSource.equals(actualSource);
-
- return indicesMatch && sourcesMatch;
- }
-
- @Override
- public void describeTo(Description description) {
- if(!indicesMatch) {
- description.appendText("Bad search request indices: ");
- description.appendText(" expected=");
- description.appendValue(expectedIndices);
- description.appendText(", got=");
- description.appendValue(actualIndices);
- description.appendText(" ");
- }
-
- if(!sourcesMatch) {
- description.appendText("Bad search request sources: ");
- description.appendText(" expected=");
- description.appendValue(expectedSource);
- description.appendText(", got=");
- description.appendValue(actualSource);
- description.appendText(" ");
- }
- }
-}
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
index 4f47a65..c16401e 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
@@ -31,8 +31,7 @@
private TableProvider tableProvider = null;
/**
- * A supplier which will return the current global config.
- * @return
+ * @return A supplier which will return the current global config.
*/
public Supplier<Map<String, Object>> getGlobalConfigSupplier() {
return globalConfigSupplier;
@@ -43,8 +42,7 @@
}
/**
- * The maximum search result.
- * @return
+ * @return The maximum number of search results.
*/
public Integer getMaxSearchResults() {
return maxSearchResults;
@@ -55,8 +53,7 @@
}
/**
- * The maximum search groups.
- * @return
+ * @return The maximum number of search groups.
*/
public Integer getMaxSearchGroups() {
return maxSearchGroups;
@@ -67,8 +64,7 @@
}
/**
- * Get optional settings for initializing indices.
- * @return
+ * @return Optional settings for initializing indices.
*/
public Map<String, String> getOptionalSettings() {
return optionalSettings;
@@ -79,8 +75,7 @@
}
/**
- * Return the table provider to use for NoSql DAOs
- * @return
+ * @return The table provider to use for NoSql DAOs
*/
public TableProvider getTableProvider() {
return tableProvider;
@@ -89,5 +84,4 @@
public void setTableProvider(TableProvider tableProvider) {
this.tableProvider = tableProvider;
}
-
}
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index f2108de..002ec28 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -21,14 +21,34 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
-import java.util.Map.Entry;
+import com.google.common.collect.Ordering;
import org.apache.metron.common.Constants;
import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.indexing.dao.search.*;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.Group;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortField;
+import org.apache.metron.indexing.dao.search.SortOrder;
import org.apache.metron.indexing.dao.update.Document;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.UUID;
public class InMemoryDao implements IndexDao {
// Map from index to list of documents as JSON strings
@@ -102,13 +122,14 @@
private static class ComparableComparator implements Comparator<Comparable> {
SortOrder order = null;
+
public ComparableComparator(SortOrder order) {
this.order = order;
}
@Override
public int compare(Comparable o1, Comparable o2) {
- int result = ComparisonChain.start().compare(o1, o2).result();
- return order == SortOrder.ASC?result:-1*result;
+ int result = ComparisonChain.start().compare(o1, o2, Ordering.natural().nullsLast()).result();
+ return order == SortOrder.ASC ? result : -1*result;
}
}
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index 8f32946..4d3ff9b 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -18,31 +18,31 @@
package org.apache.metron.indexing.dao;
import com.fasterxml.jackson.core.type.TypeReference;
-import java.util.Iterator;
-import java.util.Optional;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.indexing.dao.search.FieldType;
import org.apache.metron.indexing.dao.search.GetRequest;
import org.apache.metron.indexing.dao.search.GroupRequest;
import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
import org.apache.metron.indexing.dao.search.InvalidSearchException;
import org.apache.metron.indexing.dao.search.SearchRequest;
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
-import org.apache.metron.indexing.dao.search.GroupResult;
import org.apache.metron.indexing.dao.update.Document;
import org.apache.metron.integration.InMemoryComponent;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.junit.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
public abstract class SearchIntegrationTest {
/**
@@ -59,8 +59,8 @@
/**
* [
- * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1, "guid":"snort_1"},
- * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2, "guid":"snort_2"},
+ * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1, "guid":"snort_1", "threat:triage:score":"10"},
+ * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2, "guid":"snort_2", "threat:triage:score":"20"},
* {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3, "guid":"snort_3"},
* {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4, "guid":"snort_4"},
* {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5, "guid":"snort_5"}
@@ -155,6 +155,46 @@
/**
* {
+ * "indices": [
+ * "snort",
+ * "bro"
+ * ],
+ * "query": "*",
+ * "from": 0,
+ * "size": 25,
+ * "sort": [
+ * {
+ * "field": "threat:triage:score",
+ * "sortOrder": "asc"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ public static String sortAscendingWithMissingFields;
+
+ /**
+ * {
+ * "indices": [
+ * "snort",
+ * "bro"
+ * ],
+ * "query": "*",
+ * "from": 0,
+ * "size": 25,
+ * "sort": [
+ * {
+ * "field": "threat:triage:score",
+ * "sortOrder": "desc"
+ * }
+ * ]
+ * }
+ */
+ @Multiline
+ public static String sortDescendingWithMissingFields;
+
+ /**
+ * {
* "indices": ["bro", "snort"],
* "query": "*",
* "from": 4,
@@ -407,6 +447,7 @@
SearchResponse response = dao.search(request);
Assert.assertEquals(10, response.getTotal());
List<SearchResult> results = response.getResults();
+ Assert.assertEquals(10, results.size());
for(int i = 0;i < 5;++i) {
Assert.assertEquals("snort", results.get(i).getSource().get("source:type"));
Assert.assertEquals(10-i, results.get(i).getSource().get("timestamp"));
@@ -461,6 +502,50 @@
Assert.assertEquals(i, results.get(i-8001).getSource().get("ip_src_port"));
}
}
+ //Sort descending with missing fields
+ {
+ SearchRequest request = JSONUtils.INSTANCE.load(sortDescendingWithMissingFields, SearchRequest.class);
+ SearchResponse response = dao.search(request);
+ Assert.assertEquals(10, response.getTotal());
+ List<SearchResult> results = response.getResults();
+ Assert.assertEquals(10, results.size());
+
+ // validate sorted order - there are only 2 with a 'threat:triage:score'
+ Assert.assertEquals("20", results.get(0).getSource().get("threat:triage:score"));
+ Assert.assertEquals("10", results.get(1).getSource().get("threat:triage:score"));
+
+ // the remaining are missing the 'threat:triage:score' and should be sorted last
+ Assert.assertFalse(results.get(2).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(3).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(4).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(5).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(6).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(7).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(8).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(9).getSource().containsKey("threat:triage:score"));
+ }
+ //Sort ascending with missing fields
+ {
+ SearchRequest request = JSONUtils.INSTANCE.load(sortAscendingWithMissingFields, SearchRequest.class);
+ SearchResponse response = dao.search(request);
+ Assert.assertEquals(10, response.getTotal());
+ List<SearchResult> results = response.getResults();
+ Assert.assertEquals(10, results.size());
+
+ // the remaining are missing the 'threat:triage:score' and should be sorted last
+ Assert.assertFalse(results.get(0).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(1).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(2).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(3).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(4).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(5).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(6).getSource().containsKey("threat:triage:score"));
+ Assert.assertFalse(results.get(7).getSource().containsKey("threat:triage:score"));
+
+ // validate sorted order - there are only 2 with a 'threat:triage:score'
+ Assert.assertEquals("10", results.get(8).getSource().get("threat:triage:score"));
+ Assert.assertEquals("20", results.get(9).getSource().get("threat:triage:score"));
+ }
//pagination test case
{
SearchRequest request = JSONUtils.INSTANCE.load(paginationQuery, SearchRequest.class);
@@ -490,13 +575,18 @@
{
SearchRequest request = JSONUtils.INSTANCE.load(facetQuery, SearchRequest.class);
SearchResponse response = dao.search(request);
- Assert.assertEquals(10, response.getTotal());
+ Assert.assertEquals(12, response.getTotal());
+
Map<String, Map<String, Long>> facetCounts = response.getFacetCounts();
Assert.assertEquals(8, facetCounts.size());
+
+ // source:type
Map<String, Long> sourceTypeCounts = facetCounts.get("source:type");
Assert.assertEquals(2, sourceTypeCounts.size());
Assert.assertEquals(new Long(5), sourceTypeCounts.get("bro"));
Assert.assertEquals(new Long(5), sourceTypeCounts.get("snort"));
+
+ // ip_src_addr
Map<String, Long> ipSrcAddrCounts = facetCounts.get("ip_src_addr");
Assert.assertEquals(8, ipSrcAddrCounts.size());
Assert.assertEquals(new Long(3), ipSrcAddrCounts.get("192.168.1.1"));
@@ -507,6 +597,8 @@
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.6"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.7"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.8"));
+
+ // ip_src_port
Map<String, Long> ipSrcPortCounts = facetCounts.get("ip_src_port");
Assert.assertEquals(10, ipSrcPortCounts.size());
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8001"));
@@ -519,10 +611,14 @@
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8008"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8009"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8010"));
+
+ // long_field
Map<String, Long> longFieldCounts = facetCounts.get("long_field");
Assert.assertEquals(2, longFieldCounts.size());
Assert.assertEquals(new Long(8), longFieldCounts.get("10000"));
Assert.assertEquals(new Long(2), longFieldCounts.get("20000"));
+
+ // timestamp
Map<String, Long> timestampCounts = facetCounts.get("timestamp");
Assert.assertEquals(10, timestampCounts.size());
Assert.assertEquals(new Long(1), timestampCounts.get("1"));
@@ -535,6 +631,8 @@
Assert.assertEquals(new Long(1), timestampCounts.get("8"));
Assert.assertEquals(new Long(1), timestampCounts.get("9"));
Assert.assertEquals(new Long(1), timestampCounts.get("10"));
+
+ // latitude
Map<String, Long> latitudeCounts = facetCounts.get("latitude");
Assert.assertEquals(2, latitudeCounts.size());
List<String> latitudeKeys = new ArrayList<>(latitudeCounts.keySet());
@@ -543,6 +641,8 @@
Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001);
Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0)));
Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1)));
+
+ // score
Map<String, Long> scoreFieldCounts = facetCounts.get("score");
Assert.assertEquals(4, scoreFieldCounts.size());
List<String> scoreFieldKeys = new ArrayList<>(scoreFieldCounts.keySet());
@@ -555,6 +655,8 @@
Assert.assertEquals(new Long(2), scoreFieldCounts.get(scoreFieldKeys.get(1)));
Assert.assertEquals(new Long(3), scoreFieldCounts.get(scoreFieldKeys.get(2)));
Assert.assertEquals(new Long(1), scoreFieldCounts.get(scoreFieldKeys.get(3)));
+
+ // is_alert
Map<String, Long> isAlertCounts = facetCounts.get("is_alert");
Assert.assertEquals(2, isAlertCounts.size());
Assert.assertEquals(new Long(6), isAlertCounts.get("true"));
@@ -568,7 +670,7 @@
Assert.fail("Exception expected, but did not come.");
}
catch(InvalidSearchException ise) {
- Assert.assertEquals("Could not execute search", ise.getMessage());
+ // success
}
}
//Disabled facet query
@@ -591,7 +693,7 @@
// getColumnMetadata with multiple indices
{
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort"));
- Assert.assertEquals(13, fieldTypes.size());
+ Assert.assertEquals(15, fieldTypes.size());
Assert.assertEquals(FieldType.STRING, fieldTypes.get("guid"));
Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
@@ -605,25 +707,51 @@
Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field"));
Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
Assert.assertEquals(FieldType.OTHER, fieldTypes.get("duplicate_name_field"));
+ Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score"));
+ Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
}
// getColumnMetadata with only bro
{
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro"));
- Assert.assertEquals(12, fieldTypes.size());
+ Assert.assertEquals(13, fieldTypes.size());
+ Assert.assertEquals(FieldType.STRING, fieldTypes.get("guid"));
+ Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
+ Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+ Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+ Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+ Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+ Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+ Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+ Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+ Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field"));
+ Assert.assertEquals(FieldType.STRING, fieldTypes.get("duplicate_name_field"));
+ Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
}
// getColumnMetadata with only snort
{
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort"));
- Assert.assertEquals(12, fieldTypes.size());
+ Assert.assertEquals(14, fieldTypes.size());
Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+ Assert.assertEquals(FieldType.STRING, fieldTypes.get("guid"));
+ Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
+ Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+ Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+ Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+ Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+ Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+ Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+ Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+ Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+ Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field"));
+ Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
}
// getColumnMetadata with an index that doesn't exist
{
Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("someindex"));
Assert.assertEquals(0, fieldTypes.size());
}
- //Fields query
+ //Fields query
{
SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class);
SearchResponse response = dao.search(request);
@@ -793,7 +921,7 @@
Assert.fail("Exception expected, but did not come.");
}
catch(InvalidSearchException ise) {
- Assert.assertEquals("Could not execute search", ise.getMessage());
+ // success
}
}
//Group by IP query