METRON-1355 Convert metron-elasticsearch to new infrastructure (merrimanr) closes apache/metron#941
diff --git a/.travis.yml b/.travis.yml
index 073dc1e..de5f6a9 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -33,6 +33,7 @@
 jdk:
   - oraclejdk8
 before_install:
+  - sudo sysctl -w vm.max_map_count=262144
   - sudo rm /usr/local/bin/docker-compose
   - curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose
   - chmod +x docker-compose
@@ -43,16 +44,19 @@
   - export PATH=$M2_HOME/bin:$PATH
   - npm config set cache $HOME/.npm-cache --global
   - npm config set prefix $HOME/.npm-prefix --global
-  - if [ -f ${DOCKER_METRON_CENTOS} ]; then gunzip -c ${DOCKER_METRON_CENTOS} | docker load; else docker build metron-contrib/metron-docker-e2e/compose/metron-centos/ -t "metron-centos"; fi
+  #- if [ -f ${DOCKER_METRON_CENTOS} ]; then gunzip -c ${DOCKER_METRON_CENTOS} | docker load; else docker build metron-contrib/metron-docker-e2e/compose/metron-centos/ -t "metron-centos"; fi
 
 install:
   - time mvn -q -T 2C -DskipTests clean install
   - cd $E2E_COMPOSE_HOME && docker-compose up -d
-  - if [ ! -f ${DOCKER_METRON_CENTOS} ]; then docker save metron-centos | gzip > ${DOCKER_METRON_CENTOS}; fi
+  #- if [ ! -f ${DOCKER_METRON_CENTOS} ]; then docker save metron-centos | gzip > ${DOCKER_METRON_CENTOS}; fi
   - cd $TRAVIS_BUILD_DIR
 
 script:
-  - time mvn -q -T 2C surefire:test@unit-tests && time mvn -q surefire:test@integration-tests && time mvn -q test --projects metron-interface/metron-config && time dev-utilities/build-utils/verify_licenses.sh
+  #- time mvn -q -T 2C surefire:test@unit-tests && time mvn -q surefire:test@integration-tests && time mvn -q test --projects metron-interface/metron-config && time dev-utilities/build-utils/verify_licenses.sh
+  - metron-contrib/metron-docker-e2e/scripts/wait_for_elasticsearch.sh localhost 9210
+  - time mvn -q -T 2C surefire:test@unit-tests --projects=metron-platform/metron-elasticsearch
+  - time mvn -q surefire:test@integration-tests --projects=metron-platform/metron-elasticsearch
 
 before_cache:
   - rm -rf $HOME/.m2/repository/org/apache/metron
diff --git a/metron-contrib/metron-docker-e2e/README.md b/metron-contrib/metron-docker-e2e/README.md
new file mode 100644
index 0000000..98dff60
--- /dev/null
+++ b/metron-contrib/metron-docker-e2e/README.md
@@ -0,0 +1,94 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+# Metron Docker
+
+Metron Docker E2E is a [Docker Compose](https://docs.docker.com/compose/overview/) application that serves as a backend to integration tests.
+
+Metron Docker includes these images that have been customized for Metron:
+
+  - Kafka
+  - Zookeeper
+  - Elasticsearch
+  - Metron REST
+  - Metron UIs
+
+Setup
+-----
+
+Install [Docker for Mac](https://docs.docker.com/docker-for-mac/) or [Docker for Windows](https://docs.docker.com/docker-for-windows/).  The following versions have been tested:
+
+  - Docker version 17.12.0-ce
+  - docker-machine version 0.13.0
+  - docker-compose version 1.18.0
+
+Build Metron from the top level directory with:
+```
+$ cd $METRON_HOME
+$ mvn clean install -DskipTests
+```
+
+Create a Docker machine:
+```
+$ export METRON_DOCKER_E2E_HOME=$METRON_HOME/metron-contrib/metron-docker-e2e
+$ cd $METRON_DOCKER_E2E_HOME
+$ ./scripts/create-docker-machine.sh
+```
+
+This will create a host called "metron-machine".  Anytime you want to run Docker commands against this host, make sure you run this first to set the Docker environment variables:
+```
+$ eval "$(docker-machine env metron-machine)"
+```
+
+If you wish to use a local docker-engine install, please set an environment variable BROKER_IP_ADDR to the IP address of your host machine. This cannot be the loopback address.
+
+Usage
+-----
+
+Navigate to the compose application root:
+```
+$ cd $METRON_DOCKER_E2E_HOME/compose/
+```
+
+The Metron Docker environment lifecycle is controlled by the [docker-compose](https://docs.docker.com/compose/reference/overview/) command.  The service names can be found in the docker-compose.yml file.  For example, to build and start the environment run this command:
+```
+$ eval "$(docker-machine env metron-machine)"
+$ docker-compose up -d
+```
+
+After all services have started list the containers and ensure their status is 'Up':
+```
+$ docker-compose ps
+         Name                       Command               State                       Ports                     
+----------------------------------------------------------------------------------------------------------------
+metron_elasticsearch_1   /bin/bash bin/es-docker          Up      0.0.0.0:9210->9200/tcp, 0.0.0.0:9310->9300/tcp
+metron_kafka_1           start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp                        
+metron_metron-rest_1     /bin/sh -c ./bin/start.sh        Up      0.0.0.0:8082->8082/tcp                        
+metron_metron-ui_1       /bin/sh -c ./bin/start.sh        Up      0.0.0.0:4201->4201/tcp                        
+metron_zookeeper_1       /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp    
+```
+
+Various services are exposed through http on the Docker host.  Get the host ip from the URL property:
+```
+$ docker-machine ls
+NAME             ACTIVE   DRIVER       STATE     URL                         SWARM   DOCKER    ERRORS
+metron-machine   *        virtualbox   Running   tcp://192.168.99.100:2376           v1.12.5
+```
+
+The various integration tests can now be run against this environment.
+
+TODO: document how to set docker machine ip address for e2e tests
diff --git a/metron-contrib/metron-docker-e2e/compose/docker-compose.yml b/metron-contrib/metron-docker-e2e/compose/docker-compose.yml
index e5a8134..69a6073 100644
--- a/metron-contrib/metron-docker-e2e/compose/docker-compose.yml
+++ b/metron-contrib/metron-docker-e2e/compose/docker-compose.yml
@@ -31,7 +31,18 @@
     depends_on:
       - zookeeper
   elasticsearch:
-    image: elasticsearch:2.3
+    image: docker.elastic.co/elasticsearch/elasticsearch:5.6.8
+    environment:
+      - xpack.security.enabled=false
+      - xpack.monitoring.enabled=false
+      - xpack.ml.enabled=false
+      - xpack.graph.enabled=false
+      - xpack.watcher.enabled=false
+      - cluster.name=elasticsearch
+      - bootstrap.memory_lock=true
+      - ES_JAVA_OPTS=-Xms512m -Xmx512m
+      - discovery.zen.minimum_master_nodes=1
+      - discovery.type=single-node
     ports:
       - "9210:9200"
       - "9310:9300"
diff --git a/metron-contrib/metron-docker-e2e/compose/metron-rest/Dockerfile b/metron-contrib/metron-docker-e2e/compose/metron-rest/Dockerfile
index a76cf57..56ddc9f 100644
--- a/metron-contrib/metron-docker-e2e/compose/metron-rest/Dockerfile
+++ b/metron-contrib/metron-docker-e2e/compose/metron-rest/Dockerfile
@@ -14,7 +14,9 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 #
-FROM metron-centos
+FROM centos
+
+RUN yum install -y java-1.8.0-openjdk lsof
 
 ARG METRON_VERSION
 
diff --git a/metron-contrib/metron-docker-e2e/compose/metron-rest/bin/start.sh b/metron-contrib/metron-docker-e2e/compose/metron-rest/bin/start.sh
index f307e16..5a2da33 100755
--- a/metron-contrib/metron-docker-e2e/compose/metron-rest/bin/start.sh
+++ b/metron-contrib/metron-docker-e2e/compose/metron-rest/bin/start.sh
@@ -18,6 +18,6 @@
 #
 $METRON_HOME/bin/zk_load_configs.sh -z zookeeper:2181 -m PUSH -i $METRON_HOME/config/zookeeper
 
-METRON_REST_CLASSPATH="$METRON_HOME/lib/metron-rest-$METRON_VERSION.jar:$METRON_HOME/lib/metron-elasticsearch-$METRON_VERSION-uber.jar"
+METRON_REST_CLASSPATH="$METRON_HOME/lib/metron-rest-$METRON_VERSION.jar:$METRON_HOME/lib/metron-parsers-$METRON_VERSION-uber.jar:$METRON_HOME/lib/metron-elasticsearch-$METRON_VERSION-uber.jar"
 
 java -cp $METRON_REST_CLASSPATH org.apache.metron.rest.MetronRestApplication --spring.config.location=$METRON_HOME/config/application-docker.yml --spring.profiles.active=dev --server.port=8082
diff --git a/metron-contrib/metron-docker-e2e/pom.xml b/metron-contrib/metron-docker-e2e/pom.xml
index ebca4c3..6d5ee6c 100644
--- a/metron-contrib/metron-docker-e2e/pom.xml
+++ b/metron-contrib/metron-docker-e2e/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <groupId>org.apache.metron</groupId>
         <artifactId>metron-contrib</artifactId>
-        <version>0.4.2</version>
+        <version>0.4.3</version>
     </parent>
     <description>Metron Docker</description>
     <url>https://metron.apache.org/</url>
diff --git a/metron-contrib/metron-docker-e2e/scripts/create-docker-machine.sh b/metron-contrib/metron-docker-e2e/scripts/create-docker-machine.sh
new file mode 100755
index 0000000..1465dd9
--- /dev/null
+++ b/metron-contrib/metron-docker-e2e/scripts/create-docker-machine.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+docker-machine create --driver virtualbox --virtualbox-disk-size "30000" --virtualbox-memory "4096" --virtualbox-cpu-count "2" metron-machine
+
+# Necessary for Elasticsearch Docker image: https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html#docker-cli-run-prod-mode
+docker-machine ssh metron-machine sudo sysctl -w vm.max_map_count=262144
diff --git a/metron-contrib/metron-docker-e2e/scripts/wait_for_elasticsearch.sh b/metron-contrib/metron-docker-e2e/scripts/wait_for_elasticsearch.sh
new file mode 100755
index 0000000..ec9647f
--- /dev/null
+++ b/metron-contrib/metron-docker-e2e/scripts/wait_for_elasticsearch.sh
@@ -0,0 +1,29 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+ELASTICSEARCH_HOST=$1
+ELASTICSEARCH_PORT=$2
+DELAY=5
+MAX_ATTEMPTS=24
+COUNTER=0
+while [ $COUNTER -lt $MAX_ATTEMPTS ]; do
+  curl -s --output /dev/null -XGET http://$ELASTICSEARCH_HOST:$ELASTICSEARCH_PORT && echo Elasticsearch is up after waiting "$(($DELAY * $COUNTER))" seconds && break
+  sleep $DELAY
+  let COUNTER=COUNTER+1
+done
+if [ $COUNTER -eq $MAX_ATTEMPTS  ]; then echo Could not reach REST after "$(($DELAY * $COUNTER))" seconds; fi
diff --git a/metron-interface/metron-alerts/e2e/utils/e2e_util.ts b/metron-interface/metron-alerts/e2e/utils/e2e_util.ts
index 9304a5c..8ae1de1 100644
--- a/metron-interface/metron-alerts/e2e/utils/e2e_util.ts
+++ b/metron-interface/metron-alerts/e2e/utils/e2e_util.ts
@@ -46,28 +46,7 @@
 }
 
 export function loadTestData() {
-  request.delete('http://user:password@' + browser.params.rest.url + '/api/v1/sensor/indexing/config/alerts_ui_e2e', function (e, response, body) {
-    request.post({url:'http://user:password@' + browser.params.rest.url + '/api/v1/sensor/indexing/config/alerts_ui_e2e', json:
-    {
-      "hdfs": {
-        "index": "alerts_ui_e2e",
-        "batchSize": 5,
-        "enabled": true
-      },
-      "elasticsearch": {
-        "index": "alerts_ui_e2e",
-        "batchSize": 5,
-        "enabled": true
-      },
-      "solr": {
-        "index": "alerts_ui_e2e",
-        "batchSize": 5,
-        "enabled": true
-      }
-    }
-    }, function (e, response, body) {
-    });
-  });
+  deleteTestData();
 
   let template = fs.readFileSync('e2e/mock-data/alerts_ui_e2e_index.template', 'utf8');
   request({
@@ -89,9 +68,7 @@
 }
 
 export function deleteTestData() {
-  request.delete('http://' + browser.params.elasticsearch.url + '/alerts_ui_e2e_index*');
-  request.delete('http://user:password@' + browser.params.rest.url + '/api/v1/sensor/indexing/config/alerts_ui_e2e', function (e, response, body) {
-  });
+  request.delete('http://node1:9200/alerts_ui_e2e_index*');
 }
 
 export function createMetaAlertsIndex() {
@@ -108,6 +85,6 @@
 }
 
 export function deleteMetaAlertsIndex() {
-  request.delete('http://' + browser.params.elasticsearch.url + '/metaalert_index*');
+  request.delete('http://node1:9200/metaalert_index*');
 }
 
diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md
index 1e15018..a7358dc 100644
--- a/metron-platform/metron-elasticsearch/README.md
+++ b/metron-platform/metron-elasticsearch/README.md
@@ -25,6 +25,7 @@
 * [Type Mappings](#type-mappings)
 * [Using Metron with Elasticsearch 5.6.2](#using-metron-with-elasticsearch-562)
 * [Installing Elasticsearch Templates](#installing-elasticsearch-templates)
+* [Integration Testing](#integration-testing)
 
 ## Introduction
 
@@ -332,3 +333,9 @@
 _Method 2_ - Stop the Metron Indexing service, and start it again from Ambari UI. Note that the Metron Indexing service tracks if it has successfully installed the Elasticsearch templates, and will attempt to do so each time it is Started until successful.
 
 > Note: If you have made any customization to your index templates, then installing Elasticsearch templates afresh will lead to overwriting your existing changes. Please exercise caution.
+
+## Integration Testing
+
+Integration tests depend on Elasticsearch running inside a Docker container.  Due to limitations with Docker on Mac, docker-machine must be used to run Docker in a virtual machine.  
+
+If running integration tests in this scenario, the "es.ip" setting in the global config returned by ElasticsearchTestUtils.getGlobalConfig() must be changed from "localhost" to the ip address of your Docker machine.  The ip address can be found by running `docker-machine env metron-machine`.
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index 2311a2b..1bb1618 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -18,19 +18,6 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import static org.apache.metron.common.Constants.GUID;
-import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
-import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
-import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
-import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
-import static org.elasticsearch.index.query.QueryBuilders.termQuery;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.stream.Collectors;
-import org.apache.commons.collections4.SetUtils;
 import org.apache.lucene.search.join.ScoreMode;
 import org.apache.metron.common.Constants;
 import org.apache.metron.indexing.dao.AccessConfig;
@@ -51,29 +38,36 @@
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.get.GetResponse;
-import org.elasticsearch.action.get.MultiGetItemResponse;
-import org.elasticsearch.action.get.MultiGetRequest.Item;
-import org.elasticsearch.action.get.MultiGetRequestBuilder;
-import org.elasticsearch.action.get.MultiGetResponse;
-import org.elasticsearch.action.index.IndexRequest;
+import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
+import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
-import org.elasticsearch.action.update.UpdateRequest;
-import org.elasticsearch.action.update.UpdateResponse;
-import org.elasticsearch.common.xcontent.XContentBuilder;
 import org.elasticsearch.index.query.InnerHitBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.search.SearchHit;
-import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
-import org.apache.metron.indexing.dao.update.PatchRequest;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.metron.common.Constants.GUID;
+import static org.elasticsearch.index.query.QueryBuilders.boolQuery;
+import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery;
+import static org.elasticsearch.index.query.QueryBuilders.existsQuery;
+import static org.elasticsearch.index.query.QueryBuilders.nestedQuery;
+import static org.elasticsearch.index.query.QueryBuilders.termQuery;
 
 public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
@@ -200,7 +194,7 @@
 
     // Start a list of updates / inserts we need to run
     Map<Document, Optional<String>> updates = new HashMap<>();
-    updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX));
+    updates.put(metaAlert, Optional.of(this.index));
 
     try {
       // We need to update the associated alerts with the new meta alerts, making sure existing
@@ -430,7 +424,7 @@
       // Each meta alert needs to be updated with the new alert
       for (Document metaAlert : metaAlerts) {
         replaceAlertInMetaAlert(metaAlert, update);
-        updates.put(metaAlert, Optional.of(METAALERTS_INDEX));
+        updates.put(metaAlert, Optional.of(this.index));
       }
 
       // Run the alert's update
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
index 9e74fb6..d7880d1 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
@@ -19,21 +19,20 @@
 package org.apache.metron.elasticsearch.integration;
 
 import static org.apache.metron.indexing.dao.MetaAlertDao.ALERT_FIELD;
-import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERTS_INDEX;
 import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_FIELD;
 import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_TYPE;
 import static org.apache.metron.indexing.dao.MetaAlertDao.STATUS_FIELD;
+import static org.apache.metron.indexing.dao.MetaAlertDao.THREAT_FIELD_DEFAULT;
+import static org.apache.metron.indexing.dao.MetaAlertDao.THREAT_SORT_DEFAULT;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Iterables;
-import java.io.File;
+
 import java.io.IOException;
-import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -46,7 +45,8 @@
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
 import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao;
-import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+import org.apache.metron.elasticsearch.integration.utils.ElasticsearchTestUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.MetaAlertDao;
@@ -66,6 +66,7 @@
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.elasticsearch.client.Client;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -74,20 +75,17 @@
 import org.junit.Test;
 
 public class ElasticsearchMetaAlertIntegrationTest {
-
+  private static final String namespace = ElasticsearchMetaAlertIntegrationTest.class.getSimpleName().toLowerCase();
+  protected static final String METAALERT_INDEX = namespace + "_metaalert_index";
   private static final int MAX_RETRIES = 10;
   private static final int SLEEP_MS = 500;
   private static final String SENSOR_NAME = "test";
-  private static final String INDEX_DIR = "target/elasticsearch_meta";
-  private static final String DATE_FORMAT = "yyyy.MM.dd.HH";
-  private static final String INDEX =
-      SENSOR_NAME + "_index_" + new SimpleDateFormat(DATE_FORMAT).format(new Date());
+  private static final String INDEX = namespace + "_" + SENSOR_NAME + "_index";
   private static final String NEW_FIELD = "new-field";
   private static final String NAME_FIELD = "name";
 
   private static IndexDao esDao;
   private static MetaAlertDao metaDao;
-  private static ElasticSearchComponent es;
 
   /**
    {
@@ -104,7 +102,7 @@
   /**
    {
      "guid": "meta_alert",
-     "index": "metaalert_index",
+     "index": "elasticsearchmetaalertintegrationtest_metaalert_index",
      "patch": [
        {
          "op": "add",
@@ -121,7 +119,7 @@
   /**
    {
      "guid": "meta_alert",
-     "index": "metaalert_index",
+     "index": "elasticsearchmetaalertintegrationtest_metaalert_index",
      "patch": [
        {
          "op": "add",
@@ -143,7 +141,7 @@
   /**
    {
      "guid": "meta_alert",
-     "index": "metaalert_index",
+     "index": "elasticsearchmetaalertintegrationtest_metaalert_index",
      "patch": [
        {
          "op": "add",
@@ -177,6 +175,12 @@
            },
            "alert" : {
              "type" : "nested"
+           },
+           "threat:triage:score": {
+             "type": "float"
+           },
+           "count" : {
+             "type" : "integer"
            }
          }
        }
@@ -185,52 +189,41 @@
   @Multiline
   public static String template;
 
+  private static Client client;
+
   @BeforeClass
   public static void setupBefore() throws Exception {
-    // setup the client
-    es = new ElasticSearchComponent.Builder()
-        .withHttpPort(9211)
-        .withIndexDir(new File(INDEX_DIR))
-        .build();
-    es.start();
+    client = ElasticsearchUtils.getClient(ElasticsearchTestUtils.getGlobalConfig(), null);
+    ElasticsearchTestUtils.clearIndices(client, METAALERT_INDEX, INDEX);
 
     AccessConfig accessConfig = new AccessConfig();
-    Map<String, Object> globalConfig = new HashMap<String, Object>() {
-      {
-        put("es.clustername", "metron");
-        put("es.port", "9300");
-        put("es.ip", "localhost");
-        put("es.date.format", DATE_FORMAT);
-      }
-    };
     accessConfig.setMaxSearchResults(1000);
-    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
+    accessConfig.setGlobalConfigSupplier(ElasticsearchTestUtils::getGlobalConfig);
     accessConfig.setMaxSearchGroups(100);
 
     esDao = new ElasticsearchDao();
     esDao.init(accessConfig);
-    metaDao = new ElasticsearchMetaAlertDao(esDao);
+    metaDao = new ElasticsearchMetaAlertDao(esDao, METAALERT_INDEX, THREAT_FIELD_DEFAULT, THREAT_SORT_DEFAULT);
   }
 
   @Before
   public void setup() throws IOException {
-    es.createIndexWithMapping(METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC, template.replace("%MAPPING_NAME%", "metaalert"));
-    es.createIndexWithMapping(INDEX, "index_doc", template.replace("%MAPPING_NAME%", "index"));
+    client.admin().indices().prepareCreate(METAALERT_INDEX)
+            .addMapping(METAALERT_TYPE + "_doc", template.replace("%MAPPING_NAME%", METAALERT_TYPE)).get();
+    client.admin().indices().prepareCreate(INDEX)
+            .addMapping("index_doc", template.replace("%MAPPING_NAME%", "index")).get();
   }
 
   @AfterClass
   public static void teardown() {
-    if (es != null) {
-      es.stop();
-    }
+    ElasticsearchTestUtils.clearIndices(client, METAALERT_INDEX, INDEX);
   }
 
   @After
   public void reset() {
-    es.reset();
+    ElasticsearchTestUtils.clearIndices(client, METAALERT_INDEX, INDEX);
   }
 
-
   @Test
   public void shouldGetAllMetaAlertsForAlert() throws Exception {
     // Load alerts
@@ -245,7 +238,7 @@
     metaAlerts.add(buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
         Optional.of(Arrays.asList(alerts.get(0), alerts.get(2)))));
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(metaAlerts, METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+    elasticsearchAdd(metaAlerts, METAALERT_INDEX, MetaAlertDao.METAALERT_TYPE);
 
     // Verify load was successful
     List<GetRequest> createdDocs = metaAlerts.stream().map(metaAlert ->
@@ -352,7 +345,7 @@
     // Load metaAlert
     Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
         Optional.of(Collections.singletonList(alerts.get(0))));
-    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE);
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERT_INDEX, METAALERT_TYPE);
 
     // Verify load was successful
     findCreatedDocs(Arrays.asList(
@@ -432,7 +425,7 @@
     // Load metaAlert
     Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
         Optional.of(Arrays.asList(alerts.get(0), alerts.get(1), alerts.get(2), alerts.get(3))));
-    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE);
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERT_INDEX, METAALERT_TYPE);
 
     // Verify load was successful
     findCreatedDocs(Arrays.asList(
@@ -522,7 +515,7 @@
     // Load metaAlert
     Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.INACTIVE,
         Optional.of(Collections.singletonList(alerts.get(0))));
-    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE);
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERT_INDEX, METAALERT_TYPE);
 
     // Verify load was successful
     findCreatedDocs(Arrays.asList(
@@ -572,7 +565,7 @@
     Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
         Optional.of(childAlerts));
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX,
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERT_INDEX,
         MetaAlertDao.METAALERT_TYPE);
 
     List<GetRequest> requests = new ArrayList<>();
@@ -661,7 +654,7 @@
 
 
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERT_INDEX, MetaAlertDao.METAALERT_TYPE);
 
     // Verify load was successful
     findCreatedDocs(Arrays.asList(
@@ -671,7 +664,7 @@
     SearchResponse searchResponse = metaDao.search(new SearchRequest() {
       {
         setQuery("*");
-        setIndices(Collections.singletonList(MetaAlertDao.METAALERT_TYPE));
+        setIndices(Collections.singletonList(namespace + "_metaalert"));
         setFrom(0);
         setSize(5);
         setSort(Collections.singletonList(new SortField() {{
@@ -715,7 +708,7 @@
     Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
         Optional.of(Arrays.asList(alerts.get(2), alerts.get(3))));
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERT_INDEX, MetaAlertDao.METAALERT_TYPE);
 
     // Verify load was successful
     findCreatedDocs(Arrays.asList(
@@ -851,7 +844,7 @@
     Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE,
         Optional.of(Collections.singletonList(alerts.get(0))));
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, METAALERT_TYPE);
+    elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERT_INDEX, METAALERT_TYPE);
 
     // Verify load was successful
     findCreatedDocs(Arrays.asList(
@@ -865,7 +858,7 @@
       Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) {
         {
           put(NEW_FIELD, "metron");
-          put(MetaAlertDao.THREAT_FIELD_DEFAULT, "10");
+          put(THREAT_FIELD_DEFAULT, "10");
         }
       };
       String guid = "" + message0.get(Constants.GUID);
@@ -951,7 +944,7 @@
     Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE,
         Optional.of(Arrays.asList(alerts.get(0), alerts.get(1))));
     // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically.
-    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE);
+    elasticsearchAdd(Collections.singletonList(metaAlert), METAALERT_INDEX, MetaAlertDao.METAALERT_TYPE);
 
     // Verify load was successful
     findCreatedDocs(Arrays.asList(
@@ -1005,7 +998,7 @@
   protected long getMatchingAlertCount(String fieldName, Object fieldValue) throws IOException, InterruptedException {
     long cnt = 0;
     for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
-      List<Map<String, Object>> docs = es.getAllIndexedDocs(INDEX, SENSOR_NAME + "_doc");
+      List<Map<String, Object>> docs = ElasticsearchTestUtils.getAllIndexedDocs(client, INDEX, SENSOR_NAME + "_doc");
       cnt = docs
           .stream()
           .filter(d -> {
@@ -1019,7 +1012,7 @@
   protected long getMatchingMetaAlertCount(String fieldName, String fieldValue) throws IOException, InterruptedException {
     long cnt = 0;
     for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
-      List<Map<String, Object>> docs = es.getAllIndexedDocs(METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC);
+      List<Map<String, Object>> docs = ElasticsearchTestUtils.getAllIndexedDocs(client, METAALERT_INDEX, MetaAlertDao.METAALERT_DOC);
       cnt = docs
           .stream()
           .filter(d -> {
@@ -1085,7 +1078,7 @@
       Map<String, Object> alerts = new HashMap<>();
       alerts.put(Constants.GUID, guid);
       alerts.put("source:type", SENSOR_NAME);
-      alerts.put(MetaAlertDao.THREAT_FIELD_DEFAULT, i);
+      alerts.put(THREAT_FIELD_DEFAULT, i);
       alerts.put("timestamp", System.currentTimeMillis());
       inputData.add(alerts);
     }
@@ -1113,9 +1106,9 @@
     return metaAlert;
   }
 
-  protected void elasticsearchAdd(List<Map<String, Object>> inputData, String index, String docType)
+  protected void elasticsearchAdd(List<Map<String, Object>> inputData, String index, String sensorType)
       throws IOException {
-    es.add(index, docType, inputData.stream().map(m -> {
+    ElasticsearchTestUtils.add(client, index, sensorType + "_doc", inputData.stream().map(m -> {
           try {
             return JSONUtils.INSTANCE.toJSON(m, true);
           } catch (JsonProcessingException e) {
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 7089033..6791277 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -17,49 +17,26 @@
  */
 package org.apache.metron.elasticsearch.integration;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.ExecutionException;
-import java.util.Map;
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
+import org.apache.metron.elasticsearch.integration.utils.ElasticsearchTestUtils;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
-import org.apache.metron.indexing.dao.search.GetRequest;
-import org.apache.metron.integration.InMemoryComponent;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.client.Client;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.search.SearchHit;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.concurrent.ExecutionException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 
 public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
-  private static String host = "localhost";
-  private static String port = "9310";
-  private static String dateFormat = "yyyy.MM.dd.HH";
-  private static final int MAX_RETRIES = 10;
-  private static final int SLEEP_MS = 500;
-
   /**
    * {
    * "searchintegrationtest_bro_doc": {
@@ -186,30 +163,17 @@
   @Multiline
   private static String broDefaultStringMappings;
 
-  private static Map<String, Object> globalConfig;
   private static Client client;
 
   @BeforeClass
   public static void start() {
-    globalConfig = new HashMap<String, Object>() {{
-      put("es.clustername", "elasticsearch");
-      put("es.port", port);
-      put("es.ip", host);
-      put("es.date.format", dateFormat);
-    }};
-    client = ElasticsearchUtils.getClient(globalConfig, null);
-    clearIndices();
+    client = ElasticsearchUtils.getClient(ElasticsearchTestUtils.getGlobalConfig(), null);
+    ElasticsearchTestUtils.clearIndices(client, broIndex, snortIndex);
   }
 
   @AfterClass
   public static void stop() throws Exception {
-    clearIndices();
-  }
-
-  private static void clearIndices() {
-    try {
-      client.admin().indices().prepareDelete(broIndex, snortIndex, metaAlertIndex).get();
-    } catch (IndexNotFoundException infe) {}
+    ElasticsearchTestUtils.clearIndices(client, broIndex, snortIndex);
   }
 
   @Override
@@ -217,7 +181,7 @@
     AccessConfig config = new AccessConfig();
     config.setMaxSearchResults(100);
     config.setMaxSearchGroups(100);
-    config.setGlobalConfigSupplier( () -> globalConfig);
+    config.setGlobalConfigSupplier(ElasticsearchTestUtils::getGlobalConfig);
 
     IndexDao dao = new ElasticsearchDao();
     dao.init(config);
@@ -226,33 +190,32 @@
 
   @Override
   protected void loadTestData()
-      throws ParseException, IOException, ExecutionException, InterruptedException {
+      throws ParseException {
     client.admin().indices().prepareCreate(broIndex)
             .addMapping(broType, broTypeMappings).get();
     client.admin().indices().prepareCreate(snortIndex)
             .addMapping(snortType, snortTypeMappings).get();
 
-    BulkRequestBuilder bulkRequest = client.prepareBulk().setRefresh(true);
-    JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
-    for(Object o: broArray) {
-      JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(broIndex, broType);
-      indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString());
-      indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
-      bulkRequest.add(indexRequestBuilder);
-    }
-    JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData);
-    for(Object o: snortArray) {
-      JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(snortIndex, snortType);
-      indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString());
-      indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
-      bulkRequest.add(indexRequestBuilder);
-    }
+    BulkRequestBuilder bulkRequest = client.prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+    addIndexRequest(bulkRequest, broData, broIndex, broType);
+    addIndexRequest(bulkRequest, snortData, snortIndex, snortType);
     BulkResponse bulkResponse = bulkRequest.execute().actionGet();
     if (bulkResponse.hasFailures()) {
       throw new RuntimeException("Failed to index test data");
     }
   }
 
+  private void addIndexRequest(BulkRequestBuilder bulkRequest, String data, String index, String docType) throws ParseException {
+    JSONArray dataArray = (JSONArray) new JSONParser().parse(data);
+    for(Object o: dataArray) {
+      JSONObject jsonObject = (JSONObject) o;
+      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(index, docType);
+      indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid"));
+      indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString());
+      indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
+      bulkRequest.add(indexRequestBuilder);
+    }
+  }
+
+
 }
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index 325d42e..3c7831b 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -18,7 +18,7 @@
 package org.apache.metron.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.Iterables;
+import org.adrianwalker.multilinestring.Multiline;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Get;
@@ -26,38 +26,59 @@
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
-import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+import org.apache.metron.elasticsearch.integration.utils.ElasticsearchTestUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.hbase.mock.MockHTable;
 import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.indexing.dao.*;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.ReplaceRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.client.Client;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.File;
-import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.stream.Collectors;
 
 
 public class ElasticsearchUpdateIntegrationTest {
+  private static final String namespace = ElasticsearchUpdateIntegrationTest.class.getSimpleName().toLowerCase();
   private static final int MAX_RETRIES = 10;
   private static final int SLEEP_MS = 500;
   private static final String SENSOR_NAME= "test";
   private static final String TABLE_NAME = "modifications";
   private static final String CF = "p";
-  private static String indexDir = "target/elasticsearch_mutation";
-  private static String dateFormat = "yyyy.MM.dd.HH";
-  private static String index = SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date());
+  private static String index = namespace + "_" + SENSOR_NAME + "_index";
   private static MockHTable table;
   private static IndexDao esDao;
   private static IndexDao hbaseDao;
   private static MultiIndexDao dao;
-  private static ElasticSearchComponent es;
+  private static Client client;
+
+  /**
+   * {
+       "test_doc" : {
+         "properties" : {
+           "guid" : {
+             "type" : "keyword"
+           },
+           "ip_src_addr" : {
+             "type" : "keyword"
+           },
+           "score" : {
+             "type" : "integer"
+           },
+           "alert" : {
+             "type" : "nested"
+           }
+         }
+       }
+     }
+   */
+  @Multiline
+  public static String testTypeMappings;
 
   @BeforeClass
   public static void setup() throws Exception {
@@ -66,23 +87,15 @@
     tableProvider.addToCache(TABLE_NAME, CF);
     table = (MockHTable)tableProvider.getTable(config, TABLE_NAME);
     // setup the client
-    es = new ElasticSearchComponent.Builder()
-            .withHttpPort(9211)
-            .withIndexDir(new File(indexDir))
-            .build();
-    es.start();
+    client = ElasticsearchUtils.getClient(ElasticsearchTestUtils.getGlobalConfig(), null);
+    client.admin().indices().prepareCreate(index).addMapping("test_doc", testTypeMappings).get();
 
     hbaseDao = new HBaseDao();
     AccessConfig accessConfig = new AccessConfig();
     accessConfig.setTableProvider(tableProvider);
-    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
-      put("es.clustername", "metron");
-      put("es.port", "9300");
-      put("es.ip", "localhost");
-      put("es.date.format", dateFormat);
-      put(HBaseDao.HBASE_TABLE, TABLE_NAME);
-      put(HBaseDao.HBASE_CF, CF);
-    }};
+    Map<String, Object> globalConfig = ElasticsearchTestUtils.getGlobalConfig();
+    globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME);
+    globalConfig.put(HBaseDao.HBASE_CF, CF);
     accessConfig.setGlobalConfigSupplier(() -> globalConfig);
 
     esDao = new ElasticsearchDao();
@@ -94,9 +107,7 @@
 
   @AfterClass
   public static void teardown() {
-    if(es != null) {
-      es.stop();
-    }
+    ElasticsearchTestUtils.clearIndices(client, index);
   }
 
 
@@ -115,20 +126,18 @@
               }}
                              );
     }
-    es.add(index, SENSOR_NAME
-          , Iterables.transform(inputData,
-                    m -> {
-                      try {
-                        return JSONUtils.INSTANCE.toJSON(m, true);
-                      } catch (JsonProcessingException e) {
-                        throw new IllegalStateException(e.getMessage(), e);
-                      }
-                    }
-                    )
-    );
+    ElasticsearchTestUtils.add(client, index, SENSOR_NAME + "_doc"
+            , inputData.stream().map(m -> {
+              try {
+                return JSONUtils.INSTANCE.toJSON(m, true);
+              } catch (JsonProcessingException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+              }
+            }).collect(Collectors.toList()));
+
     List<Map<String,Object>> docs = null;
     for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) {
-      docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
+      docs = ElasticsearchTestUtils.getAllIndexedDocs(client, index, SENSOR_NAME + "_doc");
       if(docs.size() >= 10) {
         break;
       }
@@ -164,7 +173,7 @@
         //ensure ES is up-to-date
         long cnt = 0;
         for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) {
-          docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
+          docs = ElasticsearchTestUtils.getAllIndexedDocs(client, index, SENSOR_NAME + "_doc");
           cnt = docs
                   .stream()
                   .filter(d -> message0.get("new-field").equals(d.get("new-field")))
@@ -204,7 +213,7 @@
         //ensure ES is up-to-date
         long cnt = 0;
         for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t,Thread.sleep(SLEEP_MS)) {
-          docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
+          docs = ElasticsearchTestUtils.getAllIndexedDocs(client, index, SENSOR_NAME + "_doc");
           cnt = docs
                   .stream()
                   .filter(d -> message0.get("new-field").equals(d.get("new-field")))
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/utils/ElasticsearchTestUtils.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/utils/ElasticsearchTestUtils.java
new file mode 100644
index 0000000..08c870f
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/utils/ElasticsearchTestUtils.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.integration.utils;
+
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.JSONUtils;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.search.SearchHit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchTestUtils {
+
+  public static Map<String, Object> getGlobalConfig() {
+    return new HashMap<String, Object>() {{
+      put("es.clustername", "elasticsearch");
+      put("es.port", "9310");
+      put("es.ip", "localhost");
+      put("es.date.format", "yyyy.MM.dd.HH");
+    }};
+  }
+
+  public static void clearIndices(Client client, String... indices) {
+    for (String index: indices) {
+      try {
+        client.admin().indices().prepareDelete(index).get();
+      } catch (IndexNotFoundException infe) {
+      }
+    }
+  }
+
+  public static List<Map<String, Object>> getAllIndexedDocs(Client client, String index, String sourceType) throws IOException {
+    client.admin().indices().refresh(new RefreshRequest());
+    SearchResponse response = client.prepareSearch(index)
+            .setTypes(sourceType)
+            .setFrom(0)
+            .setSize(1000)
+            .execute().actionGet();
+    List<Map<String, Object>> ret = new ArrayList<Map<String, Object>>();
+    for (SearchHit hit : response.getHits()) {
+      Object o = hit.getSource();
+      ret.add((Map<String, Object>) (o));
+    }
+    return ret;
+  }
+
+  public static BulkResponse add(Client client, String indexName, String docType, Iterable<String> docs)
+          throws IOException {
+    BulkRequestBuilder bulkRequest = client.prepareBulk();
+    for (String doc : docs) {
+      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, docType);
+
+      indexRequestBuilder = indexRequestBuilder.setSource(doc);
+      Map<String, Object> esDoc = JSONUtils.INSTANCE
+              .load(doc, JSONUtils.MAP_SUPPLIER);
+      indexRequestBuilder.setId((String) esDoc.get(Constants.GUID));
+      Object ts = esDoc.get("timestamp");
+      if (ts != null) {
+        indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
+      }
+      bulkRequest.add(indexRequestBuilder);
+    }
+
+    BulkResponse response = bulkRequest.execute().actionGet();
+    if (response.hasFailures()) {
+      throw new IOException(response.buildFailureMessage());
+    }
+    return response;
+  }
+}
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index d7aa7c7..511d09f 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -50,10 +49,8 @@
   private static final String namespace = SearchIntegrationTest.class.getSimpleName().toLowerCase();
   protected static final String broIndex = namespace + "_bro_index";
   protected static final String snortIndex = namespace + "_snort_index";
-  protected static final String metaAlertIndex = namespace + "_metaalert_index";
   protected static final String broType = namespace + "_bro_doc";
   protected static final String snortType = namespace + "_snort_doc";
-  protected static final String metaAlertType = namespace + "_metaalert_doc";
 
   /**
    * [
@@ -113,7 +110,7 @@
    * },
    * {
    * "guid": "snort_2",
-   * "sensorType": "searchintegrationtest_bro"
+   * "sensorType": "searchintegrationtest_snort"
    * }
    * ]
    */
@@ -543,7 +540,6 @@
     for (int i = 2; i < 10; i++) {
       Assert.assertFalse(results.get(i).getSource().containsKey("threat:triage:score"));
     }
-<<<<<<< HEAD
   }
 
   @Test
@@ -570,27 +566,6 @@
     for (int i = 5, j = 0; i > 0; i--, j++) {
       Assert.assertEquals("bro", results.get(j).getSource().get("source:type"));
       Assert.assertEquals(i, results.get(j).getSource().get("timestamp"));
-=======
-    // getColumnMetadata with multiple indices
-    {
-      Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("searchintegrationtest_bro", "searchintegrationtest_snort"));
-      Assert.assertEquals(15, fieldTypes.size());
-      Assert.assertEquals(FieldType.STRING, fieldTypes.get("guid"));
-      Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
-      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
-      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
-      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
-      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
-      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
-      Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field"));
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("duplicate_name_field"));
-      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
->>>>>>> upstream/feature/METRON-1344-test-infrastructure
     }
   }
 
@@ -694,7 +669,7 @@
 
   @Test
   public void returns_column_data_for_multiple_indices() throws Exception {
-    Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort"));
+    Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("searchintegrationtest_bro", "searchintegrationtest_snort"));
     Assert.assertEquals(15, fieldTypes.size());
     Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
     Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));