METRON-1301 Alerts UI - Sorting on Triage Score Unexpectedly Filters Some Records (nickwallen) closes apache/metron#832
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
index 7db006e..3a68d75 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
@@ -98,7 +98,7 @@
           "mapping": {
             "type": "float"
           },
-          "match": "threat.triage.rules:*:score",
+          "match": "threat:triage:*score",
           "match_mapping_type": "*"
         }
       },
@@ -107,7 +107,7 @@
           "mapping": {
             "type": "string"
           },
-          "match": "threat.triage.rules:*:reason",
+          "match": "threat:triage:rules:*:reason",
           "match_mapping_type": "*"
         }
       },
@@ -116,9 +116,9 @@
           "mapping": {
             "type": "string"
           },
-          "match": "threat.triage.rules:*:name",
+          "match": "threat:triage:rules:*:name",
           "match_mapping_type": "*"
-        }
+      }
       }
       ],
       "properties": {
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
index f13a9ee..7c6b401 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
@@ -98,28 +98,28 @@
           "mapping": {
             "type": "float"
           },
-          "match": "threat.triage.rules:*:score",
+          "match": "threat:triage:*score",
           "match_mapping_type": "*"
         }
       },
-        {
-          "threat_triage_reason": {
-            "mapping": {
-              "type": "string"
-            },
-            "match": "threat.triage.rules:*:reason",
-            "match_mapping_type": "*"
-          }
-        },
-        {
-          "threat_triage_name": {
-            "mapping": {
-              "type": "string"
-            },
-            "match": "threat.triage.rules:*:name",
-            "match_mapping_type": "*"
-          }
+      {
+        "threat_triage_reason": {
+          "mapping": {
+            "type": "string"
+          },
+          "match": "threat:triage:rules:*:reason",
+          "match_mapping_type": "*"
         }
+      },
+      {
+        "threat_triage_name": {
+          "mapping": {
+            "type": "string"
+          },
+          "match": "threat:triage:rules:*:name",
+          "match_mapping_type": "*"
+        }
+      }
       ],
       "properties": {
         "timestamp": {
@@ -195,9 +195,6 @@
         "tcpwindow": {
           "type": "string"
         },
-        "threat:triage:level": {
-          "type": "double"
-        },
         "tos": {
           "type": "integer"
         },
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
index d84235d..d100eb0 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
@@ -98,7 +98,7 @@
           "mapping": {
             "type": "float"
           },
-          "match": "threat.triage.rules:*:score",
+          "match": "threat:triage:*score",
           "match_mapping_type": "*"
         }
       },
@@ -107,7 +107,7 @@
           "mapping": {
             "type": "string"
           },
-          "match": "threat.triage.rules:*:reason",
+          "match": "threat:triage:rules:*:reason",
           "match_mapping_type": "*"
         }
       },
@@ -116,7 +116,7 @@
           "mapping": {
             "type": "string"
           },
-          "match": "threat.triage.rules:*:name",
+          "match": "threat:triage:rules:*:name",
           "match_mapping_type": "*"
         }
       }
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 4ce9644..25bb809 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -34,6 +34,10 @@
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.env.Environment;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
 @Configuration
 public class IndexConfig {
 
@@ -57,6 +61,7 @@
       int searchMaxGroups = environment.getProperty(MetronRestConstants.SEARCH_MAX_GROUPS, Integer.class, 1000);
       String metaDaoImpl = environment.getProperty(MetronRestConstants.META_DAO_IMPL, String.class, null);
       String metaDaoSort = environment.getProperty(MetronRestConstants.META_DAO_SORT, String.class, null);
+
       AccessConfig config = new AccessConfig();
       config.setMaxSearchResults(searchMaxResults);
       config.setMaxSearchGroups(searchMaxGroups);
@@ -84,6 +89,7 @@
       MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0);
       ret.init(indexDao, Optional.ofNullable(metaDaoSort));
       return ret;
+
     }
     catch(RuntimeException re) {
       throw re;
diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
index 3673654..78a1e20 100644
--- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
+++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
@@ -89,8 +89,8 @@
   public void setup() throws Exception {
     this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
     ImmutableMap<String, String> testData = ImmutableMap.of(
-        "bro_index_2017.01.01.01", SearchIntegrationTest.broData,
-        "snort_index_2017.01.01.01", SearchIntegrationTest.snortData
+            "bro_index_2017.01.01.01", SearchIntegrationTest.broData,
+            "snort_index_2017.01.01.01", SearchIntegrationTest.snortData
     );
     loadTestData(testData);
     loadColumnTypes();
@@ -114,19 +114,19 @@
     }});
 
     assertEventually(() -> this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(defaultQuery))
-        .andExpect(status().isOk())
-        .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-        .andExpect(jsonPath("$.total").value(5))
-        .andExpect(jsonPath("$.results[0].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[0].source.timestamp").value(5))
-        .andExpect(jsonPath("$.results[1].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[1].source.timestamp").value(4))
-        .andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[2].source.timestamp").value(3))
-        .andExpect(jsonPath("$.results[3].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[3].source.timestamp").value(2))
-        .andExpect(jsonPath("$.results[4].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[4].source.timestamp").value(1))
+            .andExpect(status().isOk())
+            .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.total").value(5))
+            .andExpect(jsonPath("$.results[0].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[0].source.timestamp").value(5))
+            .andExpect(jsonPath("$.results[1].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[1].source.timestamp").value(4))
+            .andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[2].source.timestamp").value(3))
+            .andExpect(jsonPath("$.results[3].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[3].source.timestamp").value(2))
+            .andExpect(jsonPath("$.results[4].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[4].source.timestamp").value(1))
     );
 
     sensorIndexingConfigService.delete("bro");
@@ -288,4 +288,4 @@
     columnTypes.put("snort", snortTypes);
     InMemoryDao.setColumnMetadata(columnTypes);
   }
-}
+}
\ No newline at end of file
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
new file mode 100644
index 0000000..0393629
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.FieldType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Responsible for retrieving column-level metadata about search indices.
+ */
+public interface ColumnMetadataDao {
+
+  /**
+   * Retrieves column metadata for one or more search indices.
+   * @param indices The search indices to retrieve column metadata for.
+   * @return The column metadata, one set for each search index.
+   * @throws IOException
+   */
+  Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException;
+
+  /**
+   * Finds the latest version of a set of base indices.  This can be used to find
+   * the latest 'bro' index, for example.
+   *
+   * Assuming the following indices exist...
+   *
+   *    [
+   *      'bro_index_2017.10.03.19'
+   *      'bro_index_2017.10.03.20',
+   *      'bro_index_2017.10.03.21',
+   *      'snort_index_2017.10.03.19',
+   *      'snort_index_2017.10.03.20',
+   *      'snort_index_2017.10.03.21'
+   *    ]
+   *
+   *  And the include indices are given as...
+   *
+   *    ['bro', 'snort']
+   *
+   * Then the latest indices are...
+   *
+   *    ['bro_index_2017.10.03.21', 'snort_index_2017.10.03.21']
+   *
+   * @param includeIndices The base names of the indices to include
+   * @return The latest version of a set of indices.
+   */
+  String[] getLatestIndices(List<String> includeIndices);
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
new file mode 100644
index 0000000..8e210b4
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+
+/**
+ * Responsible for retrieving column-level metadata for Elasticsearch search indices.
+ */
+public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static Map<String, FieldType> elasticsearchTypeMap;
+  static {
+    Map<String, FieldType> fieldTypeMap = new HashMap<>();
+    fieldTypeMap.put("string", FieldType.STRING);
+    fieldTypeMap.put("ip", FieldType.IP);
+    fieldTypeMap.put("integer", FieldType.INTEGER);
+    fieldTypeMap.put("long", FieldType.LONG);
+    fieldTypeMap.put("date", FieldType.DATE);
+    fieldTypeMap.put("float", FieldType.FLOAT);
+    fieldTypeMap.put("double", FieldType.DOUBLE);
+    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
+    elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
+  }
+
+  /**
+   * An Elasticsearch administrative client.
+   */
+  private transient AdminClient adminClient;
+
+  /**
+   * @param adminClient The Elasticsearch admin client.
+   */
+  public ElasticsearchColumnMetadataDao(AdminClient adminClient) {
+    this.adminClient = adminClient;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
+    Map<String, FieldType> indexColumnMetadata = new HashMap<>();
+    Map<String, String> previousIndices = new HashMap<>();
+    Set<String> fieldBlackList = new HashSet<>();
+
+    String[] latestIndices = getLatestIndices(indices);
+    if (latestIndices.length > 0) {
+      ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = adminClient
+              .indices()
+              .getMappings(new GetMappingsRequest().indices(latestIndices))
+              .actionGet()
+              .getMappings();
+
+      // for each index
+      for (Object key : mappings.keys().toArray()) {
+        String indexName = key.toString();
+        ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName);
+
+        // for each mapping in the index
+        Iterator<String> mappingIterator = mapping.keysIt();
+        while (mappingIterator.hasNext()) {
+          MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
+          Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData
+                  .getSourceAsMap().get("properties");
+
+          // for each field in the mapping
+          for (String field : map.keySet()) {
+            if (!fieldBlackList.contains(field)) {
+              FieldType type = toFieldType(map.get(field).get("type"));
+
+              if(!indexColumnMetadata.containsKey(field)) {
+                indexColumnMetadata.put(field, type);
+
+                // record the last index in which a field exists, to be able to print helpful error message on type mismatch
+                previousIndices.put(field, indexName);
+
+              } else {
+                FieldType previousType = indexColumnMetadata.get(field);
+                if (!type.equals(previousType)) {
+                  String previousIndexName = previousIndices.get(field);
+                  LOG.error(String.format(
+                          "Field type mismatch: %s.%s has type %s while %s.%s has type %s.  Defaulting type to %s.",
+                          indexName, field, type.getFieldType(),
+                          previousIndexName, field, previousType.getFieldType(),
+                          FieldType.OTHER.getFieldType()));
+                  indexColumnMetadata.put(field, FieldType.OTHER);
+
+                  // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER
+                  fieldBlackList.add(field);
+                }
+              }
+            }
+          }
+        }
+      }
+    } else {
+      LOG.info(String.format("Unable to find any latest indices; indices=%s", indices));
+    }
+
+    return indexColumnMetadata;
+
+  }
+
+  /**
+   * Retrieves the latest indices.
+   * @param includeIndices
+   * @return
+   */
+  @Override
+  public String[] getLatestIndices(List<String> includeIndices) {
+    LOG.debug("Getting latest indices; indices={}", includeIndices);
+    Map<String, String> latestIndices = new HashMap<>();
+    String[] indices = adminClient
+            .indices()
+            .prepareGetIndex()
+            .setFeatures()
+            .get()
+            .getIndices();
+
+    for (String index : indices) {
+      int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
+      if (prefixEnd != -1) {
+        String prefix = index.substring(0, prefixEnd);
+        if (includeIndices.contains(prefix)) {
+          String latestIndex = latestIndices.get(prefix);
+          if (latestIndex == null || index.compareTo(latestIndex) > 0) {
+            latestIndices.put(prefix, index);
+          }
+        }
+      }
+    }
+
+    return latestIndices.values().toArray(new String[latestIndices.size()]);
+  }
+
+  /**
+   * Converts a string type to the corresponding FieldType.
+   * @param type The type to convert.
+   * @return The corresponding FieldType or FieldType.OTHER, if no match.
+   */
+  private FieldType toFieldType(String type) {
+    return elasticsearchTypeMap.getOrDefault(type, FieldType.OTHER);
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 87ad7f7..910c09b 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -17,26 +17,8 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
-
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -52,19 +34,16 @@
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
 import org.apache.metron.indexing.dao.update.Document;
 import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.mapper.ip.IpFieldMapper;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -80,18 +59,64 @@
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
 
 public class ElasticsearchDao implements IndexDao {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The value required to ensure that Elasticsearch sorts missing values last.
+   */
+  private static final String SORT_MISSING_LAST = "_last";
+
+  /**
+   * The value required to ensure that Elasticsearch sorts missing values last.
+   */
+  private static final String SORT_MISSING_FIRST = "_first";
+
+  /**
+   * The Elasticsearch client.
+   */
   private transient TransportClient client;
+
+  /**
+   * Retrieves column metadata about search indices.
+   */
+  private ColumnMetadataDao columnMetadataDao;
+
+  /**
+   * Handles the submission of search requests to Elasticsearch.
+   */
+  private ElasticsearchRequestSubmitter requestSubmitter;
+
   private AccessConfig accessConfig;
 
-  protected ElasticsearchDao(TransportClient client, AccessConfig config) {
+  protected ElasticsearchDao(TransportClient client,
+                             ColumnMetadataDao columnMetadataDao,
+                             ElasticsearchRequestSubmitter requestSubmitter,
+                             AccessConfig config) {
     this.client = client;
+    this.columnMetadataDao = columnMetadataDao;
+    this.requestSubmitter = requestSubmitter;
     this.accessConfig = config;
   }
 
@@ -99,21 +124,6 @@
     //uninitialized.
   }
 
-  private static Map<String, FieldType> elasticsearchSearchTypeMap;
-
-  static {
-    Map<String, FieldType> fieldTypeMap = new HashMap<>();
-    fieldTypeMap.put("string", FieldType.STRING);
-    fieldTypeMap.put("ip", FieldType.IP);
-    fieldTypeMap.put("integer", FieldType.INTEGER);
-    fieldTypeMap.put("long", FieldType.LONG);
-    fieldTypeMap.put("date", FieldType.DATE);
-    fieldTypeMap.put("float", FieldType.FLOAT);
-    fieldTypeMap.put("double", FieldType.DOUBLE);
-    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
-    elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
-  }
-
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException {
     return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery()));
@@ -121,56 +131,139 @@
 
   /**
    * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query.
-   * @param searchRequest The request defining the parameters of the search
+   * @param request The request defining the parameters of the search
    * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping
    * @return The results of the query
    * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search
    */
-  protected SearchResponse search(SearchRequest searchRequest, QueryBuilder queryBuilder) throws InvalidSearchException {
+  protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException {
+    org.elasticsearch.action.search.SearchRequest esRequest;
+    org.elasticsearch.action.search.SearchResponse esResponse;
+
     if(client == null) {
       throw new InvalidSearchException("Uninitialized Dao!  You must call init() prior to use.");
     }
-    if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) {
+
+    if (request.getSize() > accessConfig.getMaxSearchResults()) {
       throw new InvalidSearchException("Search result size must be less than " + accessConfig.getMaxSearchResults());
     }
-    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+
+    esRequest = buildSearchRequest(request, queryBuilder);
+    esResponse = requestSubmitter.submitSearch(esRequest);
+    return buildSearchResponse(request, esResponse);
+  }
+
+  /**
+   * Builds an Elasticsearch search request.
+   * @param searchRequest The Metron search request.
+   * @param queryBuilder
+   * @return An Elasticsearch search request.
+   */
+  private org.elasticsearch.action.search.SearchRequest buildSearchRequest(
+          SearchRequest searchRequest,
+          QueryBuilder queryBuilder) throws InvalidSearchException {
+
+    LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
+    SearchSourceBuilder searchBuilder = new SearchSourceBuilder()
             .size(searchRequest.getSize())
             .from(searchRequest.getFrom())
             .query(queryBuilder)
             .trackScores(true);
-    searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder())));
-    Optional<List<String>> fields = searchRequest.getFields();
-    if (fields.isPresent()) {
-      searchSourceBuilder.fields(fields.get());
-    } else {
-      searchSourceBuilder.fetchSource(true);
-    }
-    Optional<List<String>> facetFields = searchRequest.getFacetFields();
-    if (facetFields.isPresent()) {
-      facetFields.get().forEach(field -> searchSourceBuilder.aggregation(new TermsBuilder(getFacentAggregationName(field)).field(field)));
-    }
-    String[] wildcardIndices = wildcardIndices(searchRequest.getIndices());
-    org.elasticsearch.action.search.SearchResponse elasticsearchResponse;
+
+    // column metadata needed to understand the type of each sort field
+    Map<String, FieldType> meta;
     try {
-      elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
-              .source(searchSourceBuilder)).actionGet();
-    } catch (SearchPhaseExecutionException e) {
-      LOG.error("Could not execute search", e);
-      throw new InvalidSearchException("Could not execute search", e);
+      meta = getColumnMetadata(searchRequest.getIndices());
+    } catch(IOException e) {
+      throw new InvalidSearchException("Unable to get column metadata", e);
     }
+
+    // handle sort fields
+    for(SortField sortField : searchRequest.getSort()) {
+
+      // what type is the sort field?
+      FieldType sortFieldType = meta.getOrDefault(sortField.getField(), FieldType.OTHER);
+
+      // sort order - if ascending missing values sorted last. otherwise, missing values sorted first
+      org.elasticsearch.search.sort.SortOrder sortOrder = getElasticsearchSortOrder(sortField.getSortOrder());
+      String missingSortOrder;
+      if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) {
+        missingSortOrder = SORT_MISSING_LAST;
+      } else {
+        missingSortOrder = SORT_MISSING_FIRST;
+      }
+
+      // sort by the field - missing fields always last
+      FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField())
+              .order(sortOrder)
+              .missing(missingSortOrder)
+              .unmappedType(sortFieldType.getFieldType());
+      searchBuilder.sort(sortBy);
+    }
+
+    // handle search fields
+    if (searchRequest.getFields().isPresent()) {
+      searchBuilder.fields(searchRequest.getFields().get());
+    } else {
+      searchBuilder.fetchSource(true);
+    }
+
+    // handle facet fields
+    if (searchRequest.getFacetFields().isPresent()) {
+      for(String field : searchRequest.getFacetFields().get()) {
+        String name = getFacentAggregationName(field);
+        TermsBuilder terms = new TermsBuilder(name).field(field);
+        searchBuilder.aggregation(terms);
+      }
+    }
+
+    // return the search request
+    String[] indices = wildcardIndices(searchRequest.getIndices());
+    LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString());
+    return new org.elasticsearch.action.search.SearchRequest()
+            .indices(indices)
+            .source(searchBuilder);
+  }
+
+  /**
+   * Builds a search response.
+   *
+   * This effectively transforms an Elasticsearch search response into a Metron search response.
+   *
+   * @param searchRequest The Metron search request.
+   * @param esResponse The Elasticsearch search response.
+   * @return A Metron search response.
+   * @throws InvalidSearchException
+   */
+  private SearchResponse buildSearchResponse(
+          SearchRequest searchRequest,
+          org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException {
+
     SearchResponse searchResponse = new SearchResponse();
-    searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits());
-    searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit ->
-        getSearchResult(searchHit, fields.isPresent())).collect(Collectors.toList()));
-    if (facetFields.isPresent()) {
+    searchResponse.setTotal(esResponse.getHits().getTotalHits());
+
+    // search hits --> search results
+    List<SearchResult> results = new ArrayList<>();
+    for(SearchHit hit: esResponse.getHits().getHits()) {
+      results.add(getSearchResult(hit, searchRequest.getFields().isPresent()));
+    }
+    searchResponse.setResults(results);
+
+    // handle facet fields
+    if (searchRequest.getFacetFields().isPresent()) {
+      List<String> facetFields = searchRequest.getFacetFields().get();
       Map<String, FieldType> commonColumnMetadata;
       try {
         commonColumnMetadata = getColumnMetadata(searchRequest.getIndices());
       } catch (IOException e) {
-        throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", Arrays.toString(searchRequest.getIndices().toArray())));
+        throw new InvalidSearchException(String.format(
+                "Could not get common column metadata for indices %s",
+                Arrays.toString(searchRequest.getIndices().toArray())));
       }
-      searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), elasticsearchResponse.getAggregations(), commonColumnMetadata ));
+      searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata ));
     }
+
+    LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
     return searchResponse;
   }
 
@@ -188,42 +281,76 @@
    */
   protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder)
       throws InvalidSearchException {
+    org.elasticsearch.action.search.SearchRequest esRequest;
+    org.elasticsearch.action.search.SearchResponse esResponse;
+
     if (client == null) {
       throw new InvalidSearchException("Uninitialized Dao!  You must call init() prior to use.");
     }
     if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) {
       throw new InvalidSearchException("At least 1 group must be provided.");
     }
-    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-    searchSourceBuilder.query(queryBuilder);
-    searchSourceBuilder.aggregation(getGroupsTermBuilder(groupRequest, 0));
-    String[] wildcardIndices = wildcardIndices(groupRequest.getIndices());
-    org.elasticsearch.action.search.SearchRequest request;
-    org.elasticsearch.action.search.SearchResponse response;
 
-    try {
-      request = new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
-          .source(searchSourceBuilder);
-      response = client.search(request).actionGet();
-    } catch (SearchPhaseExecutionException e) {
-      throw new InvalidSearchException("Could not execute search", e);
-    }
+    esRequest = buildGroupRequest(groupRequest, queryBuilder);
+    esResponse = requestSubmitter.submitSearch(esRequest);
+    GroupResponse response = buildGroupResponse(groupRequest, esResponse);
+
+    return response;
+  }
+
+  /**
+   * Builds a group search request.
+   * @param groupRequest The Metron group request.
+   * @param queryBuilder The search query.
+   * @return An Elasticsearch search request.
+   */
+  private org.elasticsearch.action.search.SearchRequest buildGroupRequest(
+          GroupRequest groupRequest,
+          QueryBuilder queryBuilder) {
+
+    // handle groups
+    TermsBuilder groups = getGroupsTermBuilder(groupRequest, 0);
+    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+            .query(queryBuilder)
+            .aggregation(groups);
+
+    // return the search request
+    String[] indices = wildcardIndices(groupRequest.getIndices());
+    return new org.elasticsearch.action.search.SearchRequest()
+            .indices(indices)
+            .source(searchSourceBuilder);
+  }
+
+  /**
+   * Build a group response.
+   * @param groupRequest The original group request.
+   * @param response The search response.
+   * @return A group response.
+   * @throws InvalidSearchException
+   */
+  private GroupResponse buildGroupResponse(
+          GroupRequest groupRequest,
+          org.elasticsearch.action.search.SearchResponse response) throws InvalidSearchException {
+
+    // build the search response
     Map<String, FieldType> commonColumnMetadata;
     try {
       commonColumnMetadata = getColumnMetadata(groupRequest.getIndices());
     } catch (IOException e) {
-      throw new InvalidSearchException(String
-          .format("Could not get common column metadata for indices %s",
+      throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s",
               Arrays.toString(groupRequest.getIndices().toArray())));
     }
+
     GroupResponse groupResponse = new GroupResponse();
     groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
-    groupResponse.setGroupResults(
-        getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata));
+    groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata));
     return groupResponse;
   }
 
   private String[] wildcardIndices(List<String> indices) {
+    if(indices == null)
+      return new String[] {};
+
     return indices
             .stream()
             .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER))
@@ -235,26 +362,43 @@
     if(this.client == null) {
       this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings());
       this.accessConfig = config;
+      this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin());
+      this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
+    }
+
+    if(columnMetadataDao == null) {
+      throw new IllegalArgumentException("No ColumnMetadataDao available");
+    }
+
+    if(requestSubmitter == null) {
+      throw new IllegalArgumentException("No ElasticsearchRequestSubmitter available");
     }
   }
 
   @Override
   public Document getLatest(final String guid, final String sensorType) throws IOException {
-    Optional<Document> ret = searchByGuid(
-            guid
-            , sensorType
-            , hit -> {
-              Long ts = 0L;
-              String doc = hit.getSourceAsString();
-              String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
-              try {
-                return Optional.of(new Document(doc, guid, sourceType, ts));
-              } catch (IOException e) {
-                throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
-              }
-            }
-            );
-    return ret.orElse(null);
+    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
+    return doc.orElse(null);
+  }
+
+  private Optional<Document> toDocument(final String guid, SearchHit hit) {
+    Long ts = 0L;
+    String doc = hit.getSourceAsString();
+    String sourceType = toSourceType(hit.getType());
+    try {
+      return Optional.of(new Document(doc, guid, sourceType, ts));
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Returns the source type based on a given doc type.
+   * @param docType The document type.
+   * @return The source type.
+   */
+  private String toSourceType(String docType) {
+    return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
   }
 
   @Override
@@ -394,8 +538,7 @@
     String type = sensorType + "_doc";
     Object ts = update.getTimestamp();
     IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid())
-        .source(update.getDocument())
-        ;
+        .source(update.getDocument());
     if(ts != null) {
       indexRequest = indexRequest.timestamp(ts.toString());
     }
@@ -403,77 +546,9 @@
     return indexRequest;
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException {
-    Map<String, FieldType> indexColumnMetadata = new HashMap<>();
-
-    // Keep track of the last index used to inspect a field type so we can print a helpful error message on type mismatch
-    Map<String, String> previousIndices = new HashMap<>();
-    // If we have detected a field type mismatch, ignore the field going forward since the type has been set to OTHER
-    Set<String> fieldBlackList = new HashSet<>();
-
-    String[] latestIndices = getLatestIndices(indices);
-    if (latestIndices.length > 0) {
-      ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = client
-          .admin()
-          .indices()
-          .getMappings(new GetMappingsRequest().indices(latestIndices))
-          .actionGet()
-          .getMappings();
-      for (Object key : mappings.keys().toArray()) {
-        String indexName = key.toString();
-        ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName);
-        Iterator<String> mappingIterator = mapping.keysIt();
-        while (mappingIterator.hasNext()) {
-          MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
-          Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData
-              .getSourceAsMap().get("properties");
-          for (String field : map.keySet()) {
-            if (!fieldBlackList.contains(field)) {
-              FieldType type = elasticsearchSearchTypeMap
-                  .getOrDefault(map.get(field).get("type"), FieldType.OTHER);
-              if (indexColumnMetadata.containsKey(field)) {
-                FieldType previousType = indexColumnMetadata.get(field);
-                if (!type.equals(previousType)) {
-                  String previousIndexName = previousIndices.get(field);
-                  LOG.error(String.format(
-                      "Field type mismatch: %s.%s has type %s while %s.%s has type %s.  Defaulting type to %s.",
-                      indexName, field, type.getFieldType(),
-                      previousIndexName, field, previousType.getFieldType(),
-                      FieldType.OTHER.getFieldType()));
-                  indexColumnMetadata.put(field, FieldType.OTHER);
-                  // Detected a type mismatch so ignore the field from now on
-                  fieldBlackList.add(field);
-                }
-              } else {
-                indexColumnMetadata.put(field, type);
-                previousIndices.put(field, indexName);
-              }
-            }
-          }
-        }
-      }
-    }
-    return indexColumnMetadata;
-  }
-
-  protected String[] getLatestIndices(List<String> includeIndices) {
-    Map<String, String> latestIndices = new HashMap<>();
-    String[] indices = client.admin().indices().prepareGetIndex().setFeatures().get().getIndices();
-    for (String index : indices) {
-      int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
-      if (prefixEnd != -1) {
-        String prefix = index.substring(0, prefixEnd);
-        if (includeIndices.contains(prefix)) {
-          String latestIndex = latestIndices.get(prefix);
-          if (latestIndex == null || index.compareTo(latestIndex) > 0) {
-            latestIndices.put(prefix, index);
-          }
-        }
-      }
-    }
-    return latestIndices.values().toArray(new String[latestIndices.size()]);
+    return columnMetadataDao.getColumnMetadata(indices);
   }
 
   private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder(
@@ -588,4 +663,19 @@
   private String getSumAggregationName(String field) {
     return String.format("%s_score", field);
   }
+
+  public ElasticsearchDao client(TransportClient client) {
+    this.client = client;
+    return this;
+  }
+
+  public ElasticsearchDao columnMetadataDao(ColumnMetadataDao columnMetadataDao) {
+    this.columnMetadataDao = columnMetadataDao;
+    return this;
+  }
+
+  public ElasticsearchDao accessConfig(AccessConfig accessConfig) {
+    this.accessConfig = accessConfig;
+    return this;
+  }
 }
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
new file mode 100644
index 0000000..0e0df21
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Responsible for submitting requests to Elasticsearch.
+ */
+public class ElasticsearchRequestSubmitter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The Elasticsearch client.
+   */
+  private TransportClient client;
+
+  public ElasticsearchRequestSubmitter(TransportClient client) {
+    this.client = client;
+  }
+
+  /**
+   * Submit a search to Elasticsearch.
+   * @param request A search request.
+   * @return The search response.
+   */
+  public SearchResponse submitSearch(SearchRequest request) throws InvalidSearchException {
+    LOG.debug("About to submit a search; request={}", ElasticsearchUtils.toJSON(request).orElse("???"));
+
+    // submit the search request
+    org.elasticsearch.action.search.SearchResponse esResponse;
+    try {
+      esResponse = client
+              .search(request)
+              .actionGet();
+      LOG.debug("Got Elasticsearch response; response={}", esResponse.toString());
+
+    } catch (SearchPhaseExecutionException e) {
+      String msg = String.format(
+              "Failed to execute search; error='%s', search='%s'",
+              ExceptionUtils.getRootCauseMessage(e),
+              ElasticsearchUtils.toJSON(request).orElse("???"));
+      LOG.error(msg, e);
+      throw new InvalidSearchException(msg, e);
+    }
+
+    // check for shard failures
+    if(esResponse.getFailedShards() > 0) {
+      handleShardFailures(request, esResponse);
+    }
+
+    // validate the response status
+    if(RestStatus.OK == esResponse.status()) {
+      return esResponse;
+
+    } else {
+      // the search was not successful
+      String msg = String.format(
+              "Bad search response; status=%s, timeout=%s, terminatedEarly=%s",
+              esResponse.status(), esResponse.isTimedOut(), esResponse.isTerminatedEarly());
+      LOG.error(msg);
+      throw new InvalidSearchException(msg);
+    }
+  }
+
+  /**
+   * Handle individual shard failures that can occur even when the response is OK.  These
+   * can indicate misconfiguration of the search indices.
+   * @param request The search request.
+   * @param response  The search response.
+   */
+  private void handleShardFailures(
+          org.elasticsearch.action.search.SearchRequest request,
+          org.elasticsearch.action.search.SearchResponse response) {
+    /*
+     * shard failures are only logged.  the search itself is not failed.  this approach
+     * assumes that a user is interested in partial search results, even if the
+     * entire search result set cannot be produced.
+     *
+     * for example, assume the user adds an additional sensor and the telemetry
+     * is indexed into a new search index.  if that search index is misconfigured,
+     * it can result in partial shard failures.  rather than failing the entire search,
+     * we log the error and allow the results to be returned from shards that
+     * are correctly configured.
+     */
+    int errors = ArrayUtils.getLength(response.getShardFailures());
+    LOG.error("Search resulted in {}/{} shards failing; errors={}, search={}",
+            response.getFailedShards(),
+            response.getTotalShards(),
+            errors,
+            ElasticsearchUtils.toJSON(request).orElse("???"));
+
+    // log each reported failure
+    int failureCount=1;
+    for(ShardSearchFailure fail: response.getShardFailures()) {
+      String msg = String.format(
+              "Shard search failure [%s/%s]; reason=%s, index=%s, shard=%s, status=%s, nodeId=%s",
+              failureCount,
+              errors,
+              ExceptionUtils.getRootCauseMessage(fail.getCause()),
+              fail.index(),
+              fail.shardId(),
+              fail.status(),
+              fail.shard().getNodeId());
+      LOG.error(msg, fail.getCause());
+    }
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 4c9933b..f29012a 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -22,10 +22,15 @@
 import com.google.common.collect.Iterables;
 import org.apache.commons.lang.StringUtils;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
@@ -35,11 +40,14 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static java.lang.String.format;
 
 public class ElasticsearchUtils {
 
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
           = ThreadLocal.withInitial(() -> new HashMap<>());
 
@@ -179,4 +187,48 @@
     }
     throw new IllegalStateException("Unable to read the elasticsearch ips, expected es.ip to be either a list of strings, a string hostname or a host:port string");
   }
+
+  /**
+   * Converts an Elasticsearch SearchRequest to JSON.
+   * @param esRequest The search request.
+   * @return The JSON representation of the SearchRequest.
+   */
+  public static Optional<String> toJSON(org.elasticsearch.action.search.SearchRequest esRequest) {
+    Optional<String> json = Optional.empty();
+
+    if(esRequest != null) {
+      try {
+        json = Optional.of(XContentHelper.convertToJson(esRequest.source(), true));
+
+      } catch (Throwable t) {
+        LOG.error("Failed to convert search request to JSON", t);
+      }
+    }
+
+    return json;
+  }
+
+  /**
+   * Convert a SearchRequest to JSON.
+   * @param request The search request.
+   * @return The JSON representation of the SearchRequest.
+   */
+  public static Optional<String> toJSON(Object request) {
+    Optional<String> json = Optional.empty();
+
+    if(request != null) {
+      try {
+        json = Optional.of(
+                new ObjectMapper()
+                        .writer()
+                        .withDefaultPrettyPrinter()
+                        .writeValueAsString(request));
+
+      } catch (Throwable t) {
+        LOG.error("Failed to convert request to JSON", t);
+      }
+    }
+
+    return json;
+  }
 }
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
new file mode 100644
index 0000000..0a83ee0
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the ElasticsearchColumnMetadata class.
+ */
+public class ElasticsearchColumnMetadataDaoTest {
+
+  /**
+   * @param indices The names of all indices that will exist.
+   * @return An object to test.
+   */
+  public ElasticsearchColumnMetadataDao setup(String[] indices) {
+    return setup(indices, ImmutableOpenMap.of());
+  }
+
+  /**
+   * @param indices The names of all indices that will exist.
+   * @param mappings The index mappings.
+   * @return An object to test.
+   */
+  public ElasticsearchColumnMetadataDao setup(
+          String[] indices,
+          ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings) {
+
+    AdminClient adminClient = mock(AdminClient.class);
+    IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
+    GetIndexRequestBuilder getIndexRequestBuilder = mock(GetIndexRequestBuilder.class);
+    GetIndexResponse getIndexResponse = mock(GetIndexResponse.class);
+    ActionFuture getMappingsActionFuture = mock(ActionFuture.class);
+    GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
+
+    // setup the mocks so that a set of indices are available to the DAO
+    when(adminClient.indices()).thenReturn(indicesAdminClient);
+    when(indicesAdminClient.prepareGetIndex()).thenReturn(getIndexRequestBuilder);
+    when(getIndexRequestBuilder.setFeatures()).thenReturn(getIndexRequestBuilder);
+    when(getIndexRequestBuilder.get()).thenReturn(getIndexResponse);
+    when(getIndexResponse.getIndices()).thenReturn(indices);
+
+    // setup the mocks so that a set of mappings are available to the DAO
+    when(indicesAdminClient.getMappings(any())).thenReturn(getMappingsActionFuture);
+    when(getMappingsActionFuture.actionGet()).thenReturn(getMappingsResponse);
+    when(getMappingsResponse.getMappings()).thenReturn(mappings);
+
+    return new ElasticsearchColumnMetadataDao(adminClient);
+  }
+
+  @Test
+  public void testGetOneLatestIndex() {
+
+    // setup
+    String[] existingIndices = new String[] {
+            "bro_index_2017.10.03.19",
+            "bro_index_2017.10.03.20",
+            "bro_index_2017.10.03.21",
+            "snort_index_2017.10.03.19",
+            "snort_index_2017.10.03.20",
+            "snort_index_2017.10.03.21"
+    };
+    ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+    // get the latest indices
+    List<String> args = Collections.singletonList("bro");
+    String[] actual = dao.getLatestIndices(args);
+
+    // validation
+    String [] expected = new String[] { "bro_index_2017.10.03.21" };
+    assertArrayEquals(expected, actual);
+  }
+
+  @Test
+  public void testGetLatestIndices() {
+    // setup
+    String[] existingIndices = new String[] {
+            "bro_index_2017.10.03.19",
+            "bro_index_2017.10.03.20",
+            "bro_index_2017.10.03.21",
+            "snort_index_2017.10.03.19",
+            "snort_index_2017.10.03.19",
+            "snort_index_2017.10.03.21"
+    };
+    ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+    // get the latest indices
+    List<String> args = Arrays.asList("bro", "snort");
+    String[] actual = dao.getLatestIndices(args);
+
+    // validation
+    String [] expected = new String[] { "bro_index_2017.10.03.21", "snort_index_2017.10.03.21" };
+    assertArrayEquals(expected, actual);
+  }
+
+  @Test
+  public void testLatestIndicesWhereNoneExist() {
+
+    // setup - there are no existing indices
+    String[] existingIndices = new String[] {};
+    ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+    // get the latest indices
+    List<String> args = Arrays.asList("bro", "snort");
+    String[] actual = dao.getLatestIndices(args);
+
+    // validation
+    String [] expected = new String[] {};
+    assertArrayEquals(expected, actual);
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 7c33018..a6c0aa6 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -17,98 +17,209 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import org.apache.metron.elasticsearch.matcher.SearchRequestMatcher;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.search.*;
-import org.elasticsearch.action.ActionFuture;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SortField;
+import org.apache.metron.indexing.dao.search.SortOrder;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.junit.Assert;
-import org.junit.Before;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
 import org.junit.Test;
-import org.mockito.Mock;
+import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ElasticsearchDaoTest {
 
-  private IndexDao searchService;
+  private ElasticsearchDao dao;
+  private ElasticsearchRequestSubmitter requestSubmitter;
 
-  @Mock
-  TransportClient client;
+  private void setup(RestStatus status, int maxSearchResults, Map<String, FieldType> metadata) throws Exception {
 
-  @Before
-  public void setUp() throws Exception {
-    client = mock(TransportClient.class);
+    // setup the mock search hits
+    SearchHit hit1 = mock(SearchHit.class);
+    when(hit1.getId()).thenReturn("id1");
+    when(hit1.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value1"); }});
+    when(hit1.getScore()).thenReturn(0.1f);
+
+    SearchHit hit2 = mock(SearchHit.class);
+    when(hit2.getId()).thenReturn("id2");
+    when(hit2.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value2"); }});
+    when(hit2.getScore()).thenReturn(0.2f);
+
+    // search hits
+    SearchHit[] hits = { hit1, hit2 };
+    SearchHits searchHits = mock(SearchHits.class);
+    when(searchHits.getHits()).thenReturn(hits);
+    when(searchHits.getTotalHits()).thenReturn(Integer.toUnsignedLong(hits.length));
+
+    // search response which returns the search hits
+    org.elasticsearch.action.search.SearchResponse response = mock(org.elasticsearch.action.search.SearchResponse.class);
+    when(response.status()).thenReturn(status);
+    when(response.getHits()).thenReturn(searchHits);
+
+    // provides column metadata
+    ColumnMetadataDao columnMetadataDao = mock(ColumnMetadataDao.class);
+    when(columnMetadataDao.getColumnMetadata(any())).thenReturn(metadata);
+
+    // returns the search response
+    requestSubmitter = mock(ElasticsearchRequestSubmitter.class);
+    when(requestSubmitter.submitSearch(any())).thenReturn(response);
+
+    TransportClient client = mock(TransportClient.class);
+
+    // provides configuration
     AccessConfig config = mock(AccessConfig.class);
-    when(config.getMaxSearchResults()).thenReturn(50);
-    searchService = new ElasticsearchDao(client, config);
+    when(config.getMaxSearchResults()).thenReturn(maxSearchResults);
+
+    dao = new ElasticsearchDao(client, columnMetadataDao, requestSubmitter, config);
+  }
+
+  private void setup(RestStatus status, int maxSearchResults) throws Exception {
+    setup(status, maxSearchResults, new HashMap<>());
   }
 
   @Test
-  public void searchShouldProperlyBuildSearchRequest() throws Exception {
+  public void searchShouldSortByGivenFields() throws Exception {
 
-    // setup the mock client
-    SearchHit searchHit1 = mock(SearchHit.class);
-    when(searchHit1.getId()).thenReturn("id1");
-    when(searchHit1.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value1"); }});
-    when(searchHit1.getScore()).thenReturn(0.1f);
+    // setup the column metadata
+    Map<String, FieldType> columnMetadata = new HashMap<>();
+    columnMetadata.put("sortByStringDesc", FieldType.STRING);
+    columnMetadata.put("sortByIntAsc", FieldType.INTEGER);
 
-    SearchHit searchHit2 = mock(SearchHit.class);
-    when(searchHit2.getId()).thenReturn("id2");
-    when(searchHit2.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value2"); }});
-    when(searchHit2.getScore()).thenReturn(0.2f);
-
-    SearchHits searchHits = mock(SearchHits.class);
-    when(searchHits.getHits()).thenReturn(new SearchHit[]{searchHit1, searchHit2});
-    when(searchHits.getTotalHits()).thenReturn(2L);
-
-    org.elasticsearch.action.search.SearchResponse elasticsearchResponse = mock(org.elasticsearch.action.search.SearchResponse.class);
-    when(elasticsearchResponse.getHits()).thenReturn(searchHits);
-
-    ActionFuture actionFuture = mock(ActionFuture.class);
-    when(actionFuture.actionGet()).thenReturn(elasticsearchResponse);
-    when(client.search(any())).thenReturn(actionFuture);
+    // setup the dao
+    setup(RestStatus.OK, 25, columnMetadata);
 
     // "sort by" fields for the search request
-    SortField[] sortFields = {
-            sortBy("sortField1", SortOrder.DESC),
-            sortBy("sortField2", SortOrder.ASC)
+    SortField[] expectedSortFields = {
+            sortBy("sortByStringDesc", SortOrder.DESC),
+            sortBy("sortByIntAsc", SortOrder.ASC),
+            sortBy("sortByUndefinedDesc", SortOrder.DESC)
     };
 
-    // create a search request
+    // create a metron search request
+    final List<String> indices = Arrays.asList("bro", "snort");
     SearchRequest searchRequest = new SearchRequest();
     searchRequest.setSize(2);
-    searchRequest.setIndices(Arrays.asList("bro", "snort"));
+    searchRequest.setIndices(indices);
     searchRequest.setFrom(5);
-    searchRequest.setSort(Arrays.asList(sortFields));
+    searchRequest.setSort(Arrays.asList(expectedSortFields));
     searchRequest.setQuery("some query");
 
-    // submit the search request
-    SearchResponse searchResponse = searchService.search(searchRequest);
+    // submit the metron search request
+    SearchResponse searchResponse = dao.search(searchRequest);
+    assertNotNull(searchResponse);
 
-    // validate
-    String[] expectedIndices = {"bro_index*", "snort_index*"};
-    verify(client).search(argThat(new SearchRequestMatcher(expectedIndices, "some query", 2, 5, sortFields)));
-    assertEquals(2, searchResponse.getTotal());
-    List<SearchResult> actualSearchResults = searchResponse.getResults();
-    assertEquals(2, actualSearchResults.size());
-    assertEquals("id1", actualSearchResults.get(0).getId());
-    assertEquals("value1", actualSearchResults.get(0).getSource().get("field"));
-    assertEquals(0.1f, actualSearchResults.get(0).getScore(), 0.0f);
-    assertEquals("id2", actualSearchResults.get(1).getId());
-    assertEquals("value2", actualSearchResults.get(1).getSource().get("field"));
-    assertEquals(0.2f, actualSearchResults.get(1).getScore(), 0.0f);
-    verifyNoMoreInteractions(client);
+    // capture the elasticsearch search request that was created
+    ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class);
+    verify(requestSubmitter).submitSearch(argument.capture());
+    org.elasticsearch.action.search.SearchRequest request = argument.getValue();
+
+    // transform the request to JSON for validation
+    JSONParser parser = new JSONParser();
+    JSONObject json = (JSONObject) parser.parse(ElasticsearchUtils.toJSON(request).orElse("???"));
+
+    // validate the sort fields
+    JSONArray sortFields = (JSONArray) json.get("sort");
+    assertEquals(3, sortFields.size());
+
+    {
+      // sort by string descending
+      JSONObject aSortField = (JSONObject) sortFields.get(0);
+      JSONObject sortBy = (JSONObject) aSortField.get("sortByStringDesc");
+      assertEquals("desc", sortBy.get("order"));
+      assertEquals("_last", sortBy.get("missing"));
+      assertEquals("string", sortBy.get("unmapped_type"));
+    }
+    {
+      // sort by integer ascending
+      JSONObject aSortField = (JSONObject) sortFields.get(1);
+      JSONObject sortByIntAsc = (JSONObject) aSortField.get("sortByIntAsc");
+      assertEquals("asc", sortByIntAsc.get("order"));
+      assertEquals("_first", sortByIntAsc.get("missing"));
+      assertEquals("integer", sortByIntAsc.get("unmapped_type"));
+    }
+    {
+      // sort by unknown type
+      JSONObject aSortField = (JSONObject) sortFields.get(2);
+      JSONObject sortByUndefinedDesc = (JSONObject) aSortField.get("sortByUndefinedDesc");
+      assertEquals("desc", sortByUndefinedDesc.get("order"));
+      assertEquals("_last", sortByUndefinedDesc.get("missing"));
+      assertEquals("other", sortByUndefinedDesc.get("unmapped_type"));
+    }
+  }
+
+  @Test
+  public void searchShouldWildcardIndices() throws Exception {
+
+    // setup the dao
+    setup(RestStatus.OK, 25);
+
+    // "sort by" fields for the search request
+    SortField[] expectedSortFields = {
+            sortBy("sortByStringDesc", SortOrder.DESC),
+            sortBy("sortByIntAsc", SortOrder.ASC),
+            sortBy("sortByUndefinedDesc", SortOrder.DESC)
+    };
+
+    // create a metron search request
+    final List<String> indices = Arrays.asList("bro", "snort");
+    SearchRequest searchRequest = new SearchRequest();
+    searchRequest.setSize(2);
+    searchRequest.setIndices(indices);
+    searchRequest.setFrom(5);
+    searchRequest.setSort(Arrays.asList(expectedSortFields));
+    searchRequest.setQuery("some query");
+
+    // submit the metron search request
+    SearchResponse searchResponse = dao.search(searchRequest);
+    assertNotNull(searchResponse);
+
+    // capture the elasticsearch search request that was created
+    ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class);
+    verify(requestSubmitter).submitSearch(argument.capture());
+    org.elasticsearch.action.search.SearchRequest request = argument.getValue();
+
+    // transform the request to JSON for validation
+    JSONParser parser = new JSONParser();
+    JSONObject json = (JSONObject) parser.parse(ElasticsearchUtils.toJSON(request).orElse("???"));
+
+    // ensure that the index names are 'wildcard-ed'
+    String[] expected = { "bro_index*", "snort_index*" };
+    assertArrayEquals(expected, request.indices());
+  }
+
+
+  @Test(expected = InvalidSearchException.class)
+  public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws Exception {
+
+    int maxSearchResults = 20;
+    setup(RestStatus.OK, maxSearchResults);
+
+    SearchRequest searchRequest = new SearchRequest();
+    searchRequest.setSize(maxSearchResults+1);
+
+    dao.search(searchRequest);
+    // exception expected - size > max
   }
 
   private SortField sortBy(String field, SortOrder order) {
@@ -118,19 +229,4 @@
     return sortField;
   }
 
-  @Test
-  public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws Exception {
-    SearchRequest searchRequest = new SearchRequest();
-    searchRequest.setSize(51);
-    try {
-      searchService.search(searchRequest);
-      Assert.fail("Did not throw expected exception");
-    }
-    catch(InvalidSearchException ise) {
-      Assert.assertEquals("Search result size must be less than 50", ise.getMessage());
-    }
-  }
-
-
-
 }
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
new file mode 100644
index 0000000..26f5fff
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchShardTarget;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchRequestSubmitterTest {
+
+  private ElasticsearchRequestSubmitter submitter;
+
+  public ElasticsearchRequestSubmitter setup(SearchResponse response) {
+
+    // mocks
+    TransportClient client = mock(TransportClient.class);
+    ActionFuture future = Mockito.mock(ActionFuture.class);
+
+    // the client should return the given search response
+    when(client.search(any())).thenReturn(future);
+    when(future.actionGet()).thenReturn(response);
+
+    return new ElasticsearchRequestSubmitter(client);
+  }
+
+  @Test
+  public void searchShouldSucceedWhenOK() throws InvalidSearchException {
+
+    // mocks
+    SearchResponse response = mock(SearchResponse.class);
+    SearchRequest request = mock(SearchRequest.class);
+
+    // response will have status of OK and no failed shards
+    when(response.status()).thenReturn(RestStatus.OK);
+    when(response.getFailedShards()).thenReturn(0);
+    when(response.getTotalShards()).thenReturn(2);
+
+    // search should succeed
+    ElasticsearchRequestSubmitter submitter = setup(response);
+    SearchResponse actual = submitter.submitSearch(request);
+    assertNotNull(actual);
+  }
+
+  @Test(expected = InvalidSearchException.class)
+  public void searchShouldFailWhenNotOK() throws InvalidSearchException {
+
+    // mocks
+    SearchResponse response = mock(SearchResponse.class);
+    SearchRequest request = mock(SearchRequest.class);
+
+    // response will have status of OK
+    when(response.status()).thenReturn(RestStatus.PARTIAL_CONTENT);
+    when(response.getFailedShards()).thenReturn(0);
+    when(response.getTotalShards()).thenReturn(2);
+
+    // search should succeed
+    ElasticsearchRequestSubmitter submitter = setup(response);
+    submitter.submitSearch(request);
+  }
+
+  @Test
+  public void searchShouldHandleShardFailure() throws InvalidSearchException {
+    // mocks
+    SearchResponse response = mock(SearchResponse.class);
+    SearchRequest request = mock(SearchRequest.class);
+    ShardSearchFailure fail = mock(ShardSearchFailure.class);
+    SearchShardTarget target = mock(SearchShardTarget.class);
+
+    // response will have status of OK
+    when(response.status()).thenReturn(RestStatus.OK);
+
+    // the response will report shard failures
+    when(response.getFailedShards()).thenReturn(1);
+    when(response.getTotalShards()).thenReturn(2);
+
+    // the response will return the failures
+    ShardSearchFailure[] failures = { fail };
+    when(response.getShardFailures()).thenReturn(failures);
+
+    // shard failure needs to report the node
+    when(fail.shard()).thenReturn(target);
+    when(target.getNodeId()).thenReturn("node1");
+
+    // shard failure needs to report details of failure
+    when(fail.index()).thenReturn("bro_index_2017-10-11");
+    when(fail.shardId()).thenReturn(1);
+
+    // search should succeed, even with failed shards
+    ElasticsearchRequestSubmitter submitter = setup(response);
+    SearchResponse actual = submitter.submitSearch(request);
+    assertNotNull(actual);
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 07cc708..3d50e99 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -17,18 +17,11 @@
  */
 package org.apache.metron.elasticsearch.integration;
 
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.concurrent.ExecutionException;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao;
 import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
 import org.apache.metron.integration.InMemoryComponent;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -43,7 +36,13 @@
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+
 public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
+
   private static String indexDir = "target/elasticsearch_search";
   private static String dateFormat = "yyyy.MM.dd.HH";
   private static final int MAX_RETRIES = 10;
@@ -53,19 +52,46 @@
    * {
    * "bro_doc": {
    *   "properties": {
-   *     "source:type": { "type": "string" },
-   *     "ip_src_addr": { "type": "ip" },
-   *     "ip_src_port": { "type": "integer" },
-   *     "long_field": { "type": "long" },
-   *     "timestamp" : { "type": "date" },
-   *     "latitude" : { "type": "float" },
-   *     "score": { "type": "double" },
-   *     "is_alert": { "type": "boolean" },
-   *     "location_point": { "type": "geo_point" },
-   *     "bro_field": { "type": "string" },
-   *     "duplicate_name_field": { "type": "string" }
+   *     "source:type": {
+   *        "type": "string",
+   *        "index": "not_analyzed"
+   *     },
+   *     "ip_src_addr": {
+   *        "type": "ip"
+   *     },
+   *     "ip_src_port": {
+   *        "type": "integer"
+   *     },
+   *     "long_field": {
+   *        "type": "long"
+   *     },
+   *     "timestamp": {
+   *        "type": "date",
+   *        "format": "epoch_millis"
+   *      },
+   *     "latitude" : {
+   *        "type": "float"
+   *      },
+   *     "score": {
+   *        "type": "double"
+   *     },
+   *     "is_alert": {
+   *        "type": "boolean"
+   *     },
+   *     "location_point": {
+   *        "type": "geo_point"
+   *     },
+   *     "bro_field": {
+   *        "type": "string"
+   *     },
+   *     "duplicate_name_field": {
+   *        "type": "string"
+   *     },
+   *     "alert": {
+   *         "type": "nested"
+   *     }
    *   }
-   * }
+   *  }
    * }
    */
   @Multiline
@@ -73,21 +99,51 @@
 
   /**
    * {
-   * "snort_doc": {
-   *   "properties": {
-   *     "source:type": { "type": "string" },
-   *     "ip_src_addr": { "type": "ip" },
-   *     "ip_src_port": { "type": "integer" },
-   *     "long_field": { "type": "long" },
-   *     "timestamp" : { "type": "date" },
-   *     "latitude" : { "type": "float" },
-   *     "score": { "type": "double" },
-   *     "is_alert": { "type": "boolean" },
-   *     "location_point": { "type": "geo_point" },
-   *     "snort_field": { "type": "integer" },
-   *     "duplicate_name_field": { "type": "integer" }
-   *   }
-   * }
+   *  "snort_doc": {
+   *     "properties": {
+   *        "source:type": {
+   *          "type": "string",
+   *          "index": "not_analyzed"
+   *        },
+   *        "ip_src_addr": {
+   *          "type": "ip"
+   *        },
+   *        "ip_src_port": {
+   *          "type": "integer"
+   *        },
+   *        "long_field": {
+   *          "type": "long"
+   *        },
+   *        "timestamp": {
+   *          "type": "date",
+   *          "format": "epoch_millis"
+   *        },
+   *        "latitude" : {
+   *          "type": "float"
+   *        },
+   *        "score": {
+   *          "type": "double"
+   *        },
+   *        "is_alert": {
+   *          "type": "boolean"
+   *        },
+   *        "location_point": {
+   *          "type": "geo_point"
+   *        },
+   *        "snort_field": {
+   *          "type": "integer"
+   *        },
+   *        "duplicate_name_field": {
+   *          "type": "integer"
+   *        },
+   *        "alert": {
+   *           "type": "nested"
+   *        },
+   *        "threat:triage:score": {
+   *           "type": "float"
+   *        }
+   *      }
+   *    }
    * }
    */
   @Multiline
@@ -106,27 +162,23 @@
   @Multiline
   private static String metaAlertTypeMappings;
 
-
   @Override
   protected IndexDao createDao() throws Exception {
-    IndexDao elasticsearchDao = new ElasticsearchDao();
-    elasticsearchDao.init(
-            new AccessConfig() {{
-              setMaxSearchResults(100);
-              setMaxSearchGroups(100);
-              setGlobalConfigSupplier( () ->
-                new HashMap<String, Object>() {{
-                  put("es.clustername", "metron");
-                  put("es.port", "9300");
-                  put("es.ip", "localhost");
-                  put("es.date.format", dateFormat);
-                  }}
-              );
+    AccessConfig config = new AccessConfig();
+    config.setMaxSearchResults(100);
+    config.setMaxSearchGroups(100);
+    config.setGlobalConfigSupplier( () ->
+            new HashMap<String, Object>() {{
+              put("es.clustername", "metron");
+              put("es.port", "9300");
+              put("es.ip", "localhost");
+              put("es.date.format", dateFormat);
             }}
     );
-    MetaAlertDao ret = new ElasticsearchMetaAlertDao();
-    ret.init(elasticsearchDao);
-    return elasticsearchDao;
+
+    IndexDao dao = new ElasticsearchDao();
+    dao.init(config);
+    return dao;
   }
 
   @Override
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
deleted file mode 100644
index 417e48b..0000000
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/matcher/SearchRequestMatcher.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.elasticsearch.matcher;
-
-import org.apache.metron.indexing.dao.search.SortField;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.client.Requests;
-import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.FieldSortBuilder;
-import org.elasticsearch.search.sort.SortOrder;
-import org.hamcrest.Description;
-import org.mockito.ArgumentMatcher;
-
-import java.util.Arrays;
-
-public class SearchRequestMatcher extends ArgumentMatcher<SearchRequest> {
-
-  private String[] expectedIndices;
-  private String[] actualIndices;
-
-  private BytesReference expectedSource;
-  private BytesReference actualSource;
-
-  private boolean indicesMatch;
-  private boolean sourcesMatch;
-
-  public SearchRequestMatcher(String[] indices, String query, int size, int from, SortField[] sortFields) {
-    expectedIndices = indices;
-    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
-            .size(size)
-            .from(from)
-            .query(new QueryStringQueryBuilder(query))
-            .fetchSource(true)
-            .trackScores(true);
-    for(SortField sortField: sortFields) {
-      FieldSortBuilder fieldSortBuilder = new FieldSortBuilder(sortField.getField());
-      fieldSortBuilder.order(sortField.getSortOrder() == org.apache.metron.indexing.dao.search.SortOrder.DESC ? SortOrder.DESC : SortOrder.ASC);
-      searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder);
-    }
-    expectedSource = searchSourceBuilder.buildAsBytes(Requests.CONTENT_TYPE);
-  }
-
-  @Override
-  public boolean matches(Object o) {
-    SearchRequest searchRequest = (SearchRequest) o;
-
-    actualIndices = searchRequest.indices();
-    actualSource = searchRequest.source();
-
-    indicesMatch = Arrays.equals(expectedIndices, actualIndices);
-    sourcesMatch = expectedSource.equals(actualSource);
-
-    return indicesMatch && sourcesMatch;
-  }
-
-  @Override
-  public void describeTo(Description description) {
-    if(!indicesMatch) {
-      description.appendText("Bad search request indices: ");
-      description.appendText(" expected=");
-      description.appendValue(expectedIndices);
-      description.appendText(", got=");
-      description.appendValue(actualIndices);
-      description.appendText("  ");
-    }
-
-    if(!sourcesMatch) {
-      description.appendText("Bad search request sources: ");
-      description.appendText(" expected=");
-      description.appendValue(expectedSource);
-      description.appendText(", got=");
-      description.appendValue(actualSource);
-      description.appendText("  ");
-    }
-  }
-}
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
index 4f47a65..c16401e 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java
@@ -31,8 +31,7 @@
   private TableProvider tableProvider = null;
 
   /**
-   * A supplier which will return the current global config.
-   * @return
+   * @return A supplier which will return the current global config.
    */
   public Supplier<Map<String, Object>> getGlobalConfigSupplier() {
     return globalConfigSupplier;
@@ -43,8 +42,7 @@
   }
 
   /**
-   * The maximum search result.
-   * @return
+   * @return The maximum number of search results.
    */
   public Integer getMaxSearchResults() {
     return maxSearchResults;
@@ -55,8 +53,7 @@
   }
 
   /**
-   * The maximum search groups.
-   * @return
+   * @return The maximum number of search groups.
    */
   public Integer getMaxSearchGroups() {
     return maxSearchGroups;
@@ -67,8 +64,7 @@
   }
 
   /**
-   * Get optional settings for initializing indices.
-   * @return
+   * @return Optional settings for initializing indices.
    */
   public Map<String, String> getOptionalSettings() {
     return optionalSettings;
@@ -79,8 +75,7 @@
   }
 
   /**
-   * Return the table provider to use for NoSql DAOs
-   * @return
+   * @return The table provider to use for NoSql DAOs
    */
   public TableProvider getTableProvider() {
     return tableProvider;
@@ -89,5 +84,4 @@
   public void setTableProvider(TableProvider tableProvider) {
     this.tableProvider = tableProvider;
   }
-
 }
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index f2108de..002ec28 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -21,14 +21,34 @@
 import com.google.common.base.Splitter;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
-import java.util.Map.Entry;
+import com.google.common.collect.Ordering;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
-import org.apache.metron.indexing.dao.search.*;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.Group;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortField;
+import org.apache.metron.indexing.dao.search.SortOrder;
 import org.apache.metron.indexing.dao.update.Document;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.UUID;
 
 public class InMemoryDao implements IndexDao {
   // Map from index to list of documents as JSON strings
@@ -102,13 +122,14 @@
 
   private static class ComparableComparator implements Comparator<Comparable>  {
     SortOrder order = null;
+
     public ComparableComparator(SortOrder order) {
       this.order = order;
     }
     @Override
     public int compare(Comparable o1, Comparable o2) {
-      int result = ComparisonChain.start().compare(o1, o2).result();
-      return order == SortOrder.ASC?result:-1*result;
+      int result = ComparisonChain.start().compare(o1, o2, Ordering.natural().nullsLast()).result();
+      return order == SortOrder.ASC ? result : -1*result;
     }
   }
 
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index 8f32946..4d3ff9b 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -18,31 +18,31 @@
 package org.apache.metron.indexing.dao;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import java.util.Iterator;
-import java.util.Optional;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
-import org.apache.metron.indexing.dao.search.GroupResult;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.integration.InMemoryComponent;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.junit.*;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 public abstract class SearchIntegrationTest {
   /**
@@ -59,8 +59,8 @@
 
   /**
    * [
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1, "guid":"snort_1"},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2, "guid":"snort_2"},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1, "guid":"snort_1", "threat:triage:score":"10"},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2, "guid":"snort_2", "threat:triage:score":"20"},
    * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3, "guid":"snort_3"},
    * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4, "guid":"snort_4"},
    * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5, "guid":"snort_5"}
@@ -155,6 +155,46 @@
 
   /**
    * {
+   *  "indices": [
+   *    "snort",
+   *    "bro"
+   *  ],
+   * "query": "*",
+   * "from": 0,
+   * "size": 25,
+   * "sort": [
+   *    {
+   *      "field": "threat:triage:score",
+   *      "sortOrder": "asc"
+   *    }
+   *  ]
+   * }
+   */
+  @Multiline
+  public static String sortAscendingWithMissingFields;
+
+  /**
+   * {
+   *  "indices": [
+   *    "snort",
+   *    "bro"
+   *  ],
+   * "query": "*",
+   * "from": 0,
+   * "size": 25,
+   * "sort": [
+   *    {
+   *      "field": "threat:triage:score",
+   *      "sortOrder": "desc"
+   *    }
+   *  ]
+   * }
+   */
+  @Multiline
+  public static String sortDescendingWithMissingFields;
+
+  /**
+   * {
    * "indices": ["bro", "snort"],
    * "query": "*",
    * "from": 4,
@@ -407,6 +447,7 @@
       SearchResponse response = dao.search(request);
       Assert.assertEquals(10, response.getTotal());
       List<SearchResult> results = response.getResults();
+      Assert.assertEquals(10, results.size());
       for(int i = 0;i < 5;++i) {
         Assert.assertEquals("snort", results.get(i).getSource().get("source:type"));
         Assert.assertEquals(10-i, results.get(i).getSource().get("timestamp"));
@@ -461,6 +502,50 @@
         Assert.assertEquals(i, results.get(i-8001).getSource().get("ip_src_port"));
       }
     }
+    //Sort descending with missing fields
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(sortDescendingWithMissingFields, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertEquals(10, response.getTotal());
+      List<SearchResult> results = response.getResults();
+      Assert.assertEquals(10, results.size());
+
+      // validate sorted order - there are only 2 with a 'threat:triage:score'
+      Assert.assertEquals("20", results.get(0).getSource().get("threat:triage:score"));
+      Assert.assertEquals("10", results.get(1).getSource().get("threat:triage:score"));
+
+      // the remaining are missing the 'threat:triage:score' and should be sorted last
+      Assert.assertFalse(results.get(2).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(3).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(4).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(5).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(6).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(7).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(8).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(9).getSource().containsKey("threat:triage:score"));
+    }
+    //Sort ascending with missing fields
+    {
+      SearchRequest request = JSONUtils.INSTANCE.load(sortAscendingWithMissingFields, SearchRequest.class);
+      SearchResponse response = dao.search(request);
+      Assert.assertEquals(10, response.getTotal());
+      List<SearchResult> results = response.getResults();
+      Assert.assertEquals(10, results.size());
+
+      // the remaining are missing the 'threat:triage:score' and should be sorted last
+      Assert.assertFalse(results.get(0).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(1).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(2).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(3).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(4).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(5).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(6).getSource().containsKey("threat:triage:score"));
+      Assert.assertFalse(results.get(7).getSource().containsKey("threat:triage:score"));
+
+      // validate sorted order - there are only 2 with a 'threat:triage:score'
+      Assert.assertEquals("10", results.get(8).getSource().get("threat:triage:score"));
+      Assert.assertEquals("20", results.get(9).getSource().get("threat:triage:score"));
+    }
     //pagination test case
     {
       SearchRequest request = JSONUtils.INSTANCE.load(paginationQuery, SearchRequest.class);
@@ -490,13 +575,18 @@
     {
       SearchRequest request = JSONUtils.INSTANCE.load(facetQuery, SearchRequest.class);
       SearchResponse response = dao.search(request);
-      Assert.assertEquals(10, response.getTotal());
+      Assert.assertEquals(12, response.getTotal());
+
       Map<String, Map<String, Long>> facetCounts = response.getFacetCounts();
       Assert.assertEquals(8, facetCounts.size());
+
+      // source:type
       Map<String, Long> sourceTypeCounts = facetCounts.get("source:type");
       Assert.assertEquals(2, sourceTypeCounts.size());
       Assert.assertEquals(new Long(5), sourceTypeCounts.get("bro"));
       Assert.assertEquals(new Long(5), sourceTypeCounts.get("snort"));
+
+      // ip_src_addr
       Map<String, Long> ipSrcAddrCounts = facetCounts.get("ip_src_addr");
       Assert.assertEquals(8, ipSrcAddrCounts.size());
       Assert.assertEquals(new Long(3), ipSrcAddrCounts.get("192.168.1.1"));
@@ -507,6 +597,8 @@
       Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.6"));
       Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.7"));
       Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.8"));
+
+      // ip_src_port
       Map<String, Long> ipSrcPortCounts = facetCounts.get("ip_src_port");
       Assert.assertEquals(10, ipSrcPortCounts.size());
       Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8001"));
@@ -519,10 +611,14 @@
       Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8008"));
       Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8009"));
       Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8010"));
+
+      // long_field
       Map<String, Long> longFieldCounts = facetCounts.get("long_field");
       Assert.assertEquals(2, longFieldCounts.size());
       Assert.assertEquals(new Long(8), longFieldCounts.get("10000"));
       Assert.assertEquals(new Long(2), longFieldCounts.get("20000"));
+
+      // timestamp
       Map<String, Long> timestampCounts = facetCounts.get("timestamp");
       Assert.assertEquals(10, timestampCounts.size());
       Assert.assertEquals(new Long(1), timestampCounts.get("1"));
@@ -535,6 +631,8 @@
       Assert.assertEquals(new Long(1), timestampCounts.get("8"));
       Assert.assertEquals(new Long(1), timestampCounts.get("9"));
       Assert.assertEquals(new Long(1), timestampCounts.get("10"));
+
+      // latitude
       Map<String, Long> latitudeCounts = facetCounts.get("latitude");
       Assert.assertEquals(2, latitudeCounts.size());
       List<String> latitudeKeys = new ArrayList<>(latitudeCounts.keySet());
@@ -543,6 +641,8 @@
       Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001);
       Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0)));
       Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1)));
+
+      // score
       Map<String, Long> scoreFieldCounts = facetCounts.get("score");
       Assert.assertEquals(4, scoreFieldCounts.size());
       List<String> scoreFieldKeys = new ArrayList<>(scoreFieldCounts.keySet());
@@ -555,6 +655,8 @@
       Assert.assertEquals(new Long(2), scoreFieldCounts.get(scoreFieldKeys.get(1)));
       Assert.assertEquals(new Long(3), scoreFieldCounts.get(scoreFieldKeys.get(2)));
       Assert.assertEquals(new Long(1), scoreFieldCounts.get(scoreFieldKeys.get(3)));
+
+      // is_alert
       Map<String, Long> isAlertCounts = facetCounts.get("is_alert");
       Assert.assertEquals(2, isAlertCounts.size());
       Assert.assertEquals(new Long(6), isAlertCounts.get("true"));
@@ -568,7 +670,7 @@
         Assert.fail("Exception expected, but did not come.");
       }
       catch(InvalidSearchException ise) {
-        Assert.assertEquals("Could not execute search", ise.getMessage());
+        // success
       }
     }
     //Disabled facet query
@@ -591,7 +693,7 @@
     // getColumnMetadata with multiple indices
     {
       Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort"));
-      Assert.assertEquals(13, fieldTypes.size());
+      Assert.assertEquals(15, fieldTypes.size());
       Assert.assertEquals(FieldType.STRING, fieldTypes.get("guid"));
       Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
       Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
@@ -605,25 +707,51 @@
       Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field"));
       Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
       Assert.assertEquals(FieldType.OTHER, fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
     }
     // getColumnMetadata with only bro
     {
       Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro"));
-      Assert.assertEquals(12, fieldTypes.size());
+      Assert.assertEquals(13, fieldTypes.size());
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("guid"));
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
       Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
     }
     // getColumnMetadata with only snort
     {
       Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort"));
-      Assert.assertEquals(12, fieldTypes.size());
+      Assert.assertEquals(14, fieldTypes.size());
       Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("guid"));
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
     }
     // getColumnMetadata with an index that doesn't exist
     {
       Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("someindex"));
       Assert.assertEquals(0, fieldTypes.size());
     }
-    //Fields query
+     //Fields query
     {
       SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class);
       SearchResponse response = dao.search(request);
@@ -793,7 +921,7 @@
         Assert.fail("Exception expected, but did not come.");
       }
       catch(InvalidSearchException ise) {
-        Assert.assertEquals("Could not execute search", ise.getMessage());
+        // success
       }
     }
     //Group by IP query