METRON-1834: Migrate Elasticsearch from TransportClient to new Java REST API (mmiklavc via mmiklavc)
diff --git a/Upgrading.md b/Upgrading.md
index a0dd5d3..1da43fa 100644
--- a/Upgrading.md
+++ b/Upgrading.md
@@ -21,6 +21,13 @@
 
 ## 0.6.0 to 0.6.1
 
+### [METRON-1834: Migrate Elasticsearch from TransportClient to new Java REST API](https://issues.apache.org/jira/browse/METRON-1834)
+The Elasticsearch Java client has now been migrated from TransportClient to the new Java REST client. The motivation for this change
+is that TransportClient will be deprecated in Elasticsearch 7.0 and removed entirely in 8.0. See [ES Java API ](https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.6/client.html) for more details.
+The primary client-facing change for upgrades will be the new properties for configuring the new client. An explanation of the new properties
+as well as a mapping from the old properties to the new can be found in [metron-elasticsearch](metron-platform/metron-elasticsearch/README.md#Properties) under `es.client.settings`.
+
+
 ### [METRON-1855: Make unified enrichment topology the default and deprecate split-join](https://issues.apache.org/jira/browse/METRON-1855)
 The unified enrichment topology will be the new default in this release,
 and the split-join enrichment topology is now considered deprecated.
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 7bc0e9b..bdc2508 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -487,3 +487,5 @@
 com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
 org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator
 com.github.palindromicity:simple-syslog-5424:jar:0.0.9:compile,ASLv2,https://github.com/palindromicity/simple-syslog-5424
+org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:5.6.2:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
+org.elasticsearch.plugin:aggs-matrix-stats-client:jar:5.6.2:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
diff --git a/metron-deployment/Kerberos-manual-setup.md b/metron-deployment/Kerberos-manual-setup.md
index 4bd4516..6d40552 100644
--- a/metron-deployment/Kerberos-manual-setup.md
+++ b/metron-deployment/Kerberos-manual-setup.md
@@ -563,17 +563,16 @@
     sudo -u hdfs hdfs dfs -chown metron:metron /apps/metron/elasticsearch/xpack-password
     ```
 
-1. New settings have been added to configure the Elasticsearch client. By default the client will run as the normal ES prebuilt transport client. If you enable X-Pack you should set the es.client.class as shown below.
+1. New settings have been added to configure the Elasticsearch client.
 
-    Add the `es.client.settings` to global.json
+    Modify the `es.client.settings` key in global.json
 
     ```
     $METRON_HOME/config/zookeeper/global.json ->
 
       "es.client.settings" : {
-          "es.client.class" : "org.elasticsearch.xpack.client.PreBuiltXPackTransportClient",
-          "es.xpack.username" : "transport_client_user",
-          "es.xpack.password.file" : "/apps/metron/elasticsearch/xpack-password"
+          "xpack.username" : "transport_client_user",
+          "xpack.password.file" : "/apps/metron/elasticsearch/xpack-password"
       }
     ```
 
@@ -583,151 +582,6 @@
     $METRON_HOME/bin/zk_load_configs.sh -m PUSH -i $METRON_HOME/config/zookeeper/ -z $ZOOKEEPER
     ```
 
-1. The last step before restarting the topology is to create a custom X-Pack shaded and relocated jar. This is up to you because of licensing restrictions, but here is a sample Maven pom file that should help.
-
-    ```
-    <?xml version="1.0" encoding="UTF-8"?>
-    <!--
-      Licensed to the Apache Software
-    	Foundation (ASF) under one or more contributor license agreements. See the
-    	NOTICE file distributed with this work for additional information regarding
-    	copyright ownership. The ASF licenses this file to You under the Apache License,
-    	Version 2.0 (the "License"); you may not use this file except in compliance
-    	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
-    	Unless required by applicable law or agreed to in writing, software distributed
-    	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
-    	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
-      the specific language governing permissions and limitations under the License.
-      -->
-    <project xmlns="http://maven.apache.org/POM/4.0.0"
-             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-        <modelVersion>4.0.0</modelVersion>
-        <groupId>org.elasticsearch</groupId>
-        <artifactId>elasticsearch-xpack-shaded</artifactId>
-        <name>elasticsearch-xpack-shaded</name>
-        <packaging>jar</packaging>
-        <version>5.6.2</version>
-        <repositories>
-            <repository>
-                <id>elasticsearch-releases</id>
-                <url>https://artifacts.elastic.co/maven</url>
-                <releases>
-                    <enabled>true</enabled>
-                </releases>
-                <snapshots>
-                    <enabled>false</enabled>
-                </snapshots>
-            </repository>
-        </repositories>
-        <dependencies>
-            <dependency>
-                <groupId>org.elasticsearch.client</groupId>
-                <artifactId>x-pack-transport</artifactId>
-                <version>5.6.2</version>
-                <exclusions>
-                  <exclusion>
-                    <groupId>com.fasterxml.jackson.dataformat</groupId>
-                    <artifactId>jackson-dataformat-yaml</artifactId>
-                  </exclusion>
-                  <exclusion>
-                    <groupId>com.fasterxml.jackson.dataformat</groupId>
-                    <artifactId>jackson-dataformat-cbor</artifactId>
-                  </exclusion>
-                  <exclusion>
-                    <groupId>com.fasterxml.jackson.core</groupId>
-                    <artifactId>jackson-core</artifactId>
-                  </exclusion>
-                  <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-api</artifactId>
-                  </exclusion>
-                  <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>slf4j-log4j12</artifactId>
-                  </exclusion>
-                  <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                  </exclusion>
-                </exclusions>
-              </dependency>
-        </dependencies>
-        <build>
-            <plugins>
-                <plugin>
-                    <groupId>org.apache.maven.plugins</groupId>
-                    <artifactId>maven-shade-plugin</artifactId>
-                    <version>3.2.0</version>
-                    <configuration>
-                        <createDependencyReducedPom>true</createDependencyReducedPom>
-                    </configuration>
-                    <executions>
-                        <execution>
-                            <phase>package</phase>
-                            <goals>
-                                <goal>shade</goal>
-                            </goals>
-                            <configuration>
-                              <filters>
-                                <filter>
-                                  <artifact>*:*</artifact>
-                                  <excludes>
-                                    <exclude>META-INF/*.SF</exclude>
-                                    <exclude>META-INF/*.DSA</exclude>
-                                    <exclude>META-INF/*.RSA</exclude>
-                                  </excludes>
-                                </filter>
-                              </filters>
-                              <relocations>
-    				<relocation>
-                                        <pattern>io.netty</pattern>
-                                        <shadedPattern>org.apache.metron.io.netty</shadedPattern>
-                                    </relocation>
-                                    <relocation>
-                                        <pattern>org.apache.logging.log4j</pattern>
-                                        <shadedPattern>org.apache.metron.logging.log4j</shadedPattern>
-                                    </relocation>
-                                </relocations>
-                                <artifactSet>
-                                    <excludes>
-                                        <exclude>org.slf4j.impl*</exclude>
-                                        <exclude>org.slf4j:slf4j-log4j*</exclude>
-                                    </excludes>
-                                </artifactSet>
-                                <transformers>
-                                    <transformer
-                                      implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
-                                         <resources>
-                                            <resource>.yaml</resource>
-                                            <resource>LICENSE.txt</resource>
-                                            <resource>ASL2.0</resource>
-                                            <resource>NOTICE.txt</resource>
-                                          </resources>
-                                    </transformer>
-                                    <transformer
-                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
-                                    <transformer
-                                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                        <mainClass></mainClass>
-                                    </transformer>
-                                </transformers>
-                            </configuration>
-                        </execution>
-                    </executions>
-                </plugin>
-            </plugins>
-        </build>
-    </project>
-    ```
-
-1. Once you've built the `elasticsearch-xpack-shaded-5.6.2.jar`, it needs to be made available to Storm when you submit the topology. Create a contrib directory for indexing and put the jar file in this directory.
-
-    ```
-    mkdir $METRON_HOME/indexing_contrib
-    cp elasticsearch-xpack-shaded-5.6.2.jar $METRON_HOME/indexing_contrib/elasticsearch-xpack-shaded-5.6.2.jar
-    ```
-
 1. Now you can restart the Elasticsearch topology. Note, you should perform this step manually, as follows.
 
     ```
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 81dda6c..e644b31 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -95,15 +95,6 @@
         </value-attributes>
     </property>
     <property>
-        <name>es_binary_port</name>
-        <value>9300</value>
-        <description>Elasticsearch binary port. (9300)</description>
-        <display-name>Elasticsearch Binary Port</display-name>
-        <value-attributes>
-            <empty-value-valid>true</empty-value-valid>
-        </value-attributes>
-    </property>
-    <property>
         <name>es_http_port</name>
         <value>9200</value>
         <description>Elasticsearch HTTP port. (9200)</description>
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
index 9d15e93..a7074da 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/metron_service.py
@@ -583,8 +583,6 @@
         missing.append("metron-env/es_cluster_name")
       if not config['configurations']['metron-env']['es_hosts']:
         missing.append("metron-env/es_hosts")
-      if not config['configurations']['metron-env']['es_binary_port']:
-        missing.append("metron-env/es_binary_port")
       if not config['configurations']['metron-env']['es_date_format']:
         missing.append("metron-env/es_date_format")
 
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index 060dfe4..5635330 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -106,9 +106,8 @@
 es_cluster_name = config['configurations']['metron-env']['es_cluster_name']
 es_hosts = config['configurations']['metron-env']['es_hosts']
 es_host_list = es_hosts.split(",")
-es_binary_port = config['configurations']['metron-env']['es_binary_port']
-es_url = ",".join([host + ":" + es_binary_port for host in es_host_list])
 es_http_port = config['configurations']['metron-env']['es_http_port']
+es_url = ",".join([host + ":" + es_http_port for host in es_host_list])
 es_http_url = es_host_list[0] + ":" + es_http_port
 es_date_format = config['configurations']['metron-env']['es_date_format']
 
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index 46c06dd..7f84f1d 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -489,10 +489,6 @@
           "subsection-name": "subsection-index-settings"
         },
         {
-          "config": "metron-env/es_binary_port",
-          "subsection-name": "subsection-index-settings"
-        },
-        {
           "config": "metron-env/es_http_port",
           "subsection-name": "subsection-index-settings"
         },
@@ -1006,12 +1002,6 @@
         }
       },
       {
-        "config": "metron-env/es_binary_port",
-        "widget": {
-          "type": "text-field"
-        }
-      },
-      {
         "config": "metron-env/es_http_port",
         "widget": {
           "type": "text-field"
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
index bd8419f..7581ef3 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java
@@ -59,7 +59,7 @@
   public SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException {
     try {
       return dao.getAllMetaAlertsForAlert(guid);
-    } catch (InvalidSearchException ise) {
+    } catch (IOException|InvalidSearchException ise) {
       throw new RestException(ise.getMessage(), ise);
     }
   }
diff --git a/metron-platform/elasticsearch-shaded/pom.xml b/metron-platform/elasticsearch-shaded/pom.xml
index ccad3cb..7766e3d 100644
--- a/metron-platform/elasticsearch-shaded/pom.xml
+++ b/metron-platform/elasticsearch-shaded/pom.xml
@@ -30,40 +30,15 @@
             <version>18.0</version>
         </dependency>
         <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-common</artifactId>
+            <version>4.1.13.Final</version>
+        </dependency>
+        <dependency>
             <groupId>org.elasticsearch.client</groupId>
-            <artifactId>transport</artifactId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
             <version>${global_elasticsearch_version}</version>
-            <exclusions>
-              <exclusion>
-                <groupId>com.fasterxml.jackson.dataformat</groupId>
-                <artifactId>jackson-dataformat-smile</artifactId>
-              </exclusion>
-              <exclusion>
-                <groupId>com.fasterxml.jackson.dataformat</groupId>
-                <artifactId>jackson-dataformat-yaml</artifactId>
-              </exclusion>
-              <exclusion>
-                <groupId>com.fasterxml.jackson.dataformat</groupId>
-                <artifactId>jackson-dataformat-cbor</artifactId>
-              </exclusion>
-              <exclusion>
-                <groupId>com.fasterxml.jackson.core</groupId>
-                <artifactId>jackson-core</artifactId>
-              </exclusion>
-              <exclusion>
-                <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-api</artifactId>
-              </exclusion>
-              <exclusion>
-                <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-log4j12</artifactId>
-              </exclusion>
-              <exclusion>
-                <groupId>log4j</groupId>
-                <artifactId>log4j</artifactId>
-              </exclusion>
-            </exclusions>
-          </dependency>
+        </dependency>
           <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-core</artifactId>
@@ -95,10 +70,6 @@
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-core</artifactId>
                 </exclusion>
-                <exclusion> <!-- this is causing a weird build error if not excluded - Error creating shaded jar: null: IllegalArgumentException -->
-                    <groupId>org.apache.logging.log4j</groupId>
-                    <artifactId>log4j-api</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -155,10 +126,6 @@
                                     <shadedPattern>org.apache.metron.io.netty</shadedPattern>
                                 </relocation>
                                 <relocation>
-                                    <pattern>org.apache.logging.log4j</pattern>
-                                    <shadedPattern>org.apache.metron.logging.log4j</shadedPattern>
-                                </relocation>
-                                <relocation>
                                     <pattern>com.google.common</pattern>
                                     <shadedPattern>org.apache.metron.guava.elasticsearch-shaded</shadedPattern>
                                 </relocation>
diff --git a/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties b/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties
deleted file mode 100644
index c4bd3f0..0000000
--- a/metron-platform/elasticsearch-shaded/src/main/resources/META-INF/log4j-provider.properties
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-LoggerContextFactory = org.apache.metron.logging.log4j.core.impl.Log4jContextFactory
-Log4jAPIVersion = 2.6.0
-FactoryPriority= 10
\ No newline at end of file
diff --git a/metron-platform/metron-common/README.md b/metron-platform/metron-common/README.md
index dac1974..d68259a 100644
--- a/metron-platform/metron-common/README.md
+++ b/metron-platform/metron-common/README.md
@@ -79,29 +79,30 @@
 Various parts of our stack uses the global config are documented throughout the Metron documentation,
 but a convenient index is provided here:
 
-| Property Name                                                                                                       | Subsystem     | Type       | Ambari Property            |
-|---------------------------------------------------------------------------------------------------------------------|---------------|------------|----------------------------|
-| [`es.clustername`](../metron-elasticsearch#esclustername)                                                           | Indexing      | String     | `es_cluster_name`          |
-| [`es.ip`](../metron-elasticsearch#esip)                                                                             | Indexing      | String     | `es_hosts`                 |
-| [`es.port`](../metron-elasticsearch#esport)                                                                         | Indexing      | String     | `es_port`                  |
-| [`es.date.format`](../metron-elasticsearch#esdateformat)                                                            | Indexing      | String     | `es_date_format`           |
-| [`fieldValidations`](#validation-framework)                                                                         | Parsing       | Object     | N/A                        |
-| [`parser.error.topic`](../metron-parsers#parsererrortopic)                                                          | Parsing       | String     | N/A                        |
-| [`stellar.function.paths`](../../metron-stellar/stellar-common#stellarfunctionpaths)                                | Stellar       | CSV String | N/A                        |
-| [`stellar.function.resolver.includes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar       | CSV String | N/A                        |
-| [`stellar.function.resolver.excludes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar       | CSV String | N/A                        |
-| [`profiler.period.duration`](../../metron-analytics/metron-profiler#profilerperiodduration)                         | Profiler      | Integer    | `profiler_period_duration` |
-| [`profiler.period.duration.units`](../../metron-analytics/metron-profiler#profilerperioddurationunits)              | Profiler      | String     | `profiler_period_units`    |
-| [`profiler.writer.batchSize`](../../metron-analytics/metron-profiler/#profilerwriterbatchsize)                      | Profiler      | Integer    |  N/A                       |
-| [`profiler.writer.batchTimeout`](../../metron-analytics/metron-profiler/#profilerwriterbatchtimeout)                | Profiler      | Integer    |  N/A                       |
-| [`update.hbase.table`](../metron-indexing#updatehbasetable)                                                         | REST/Indexing | String     | `update_hbase_table`       |
-| [`update.hbase.cf`](../metron-indexing#updatehbasecf)                                                               | REST/Indexing | String     | `update_hbase_cf`          |
-| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile)                                                                 | Enrichment    | String     | `geo_hdfs_file`            |
-| [`enrichment.writer.batchSize`](../metron-enrichment#enrichmentwriterbatchsize)                                     | Enrichment    | Integer    |  N/A                       |
-| [`enrichment.writer.batchTimeout`](../metron-enrichment#enrichmentwriterbatchtimeout)                               | Enrichment    | Integer    |  N/A                       |
-| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile)                                                                 | Enrichment    | String     | `geo_hdfs_file`            |
-| [`source.type.field`](../../metron-interface/metron-alerts#sourcetypefield)                                         | UI            | String     |  `source_type_field`       |
-| [`threat.triage.score.field`](../../metron-interface/metron-alerts#threattriagescorefield)                          | UI            | String     |  `threat_triage_score_field`       |
+| Property Name                                                                                                       | Subsystem     | Type       | Ambari Property              |
+|---------------------------------------------------------------------------------------------------------------------|---------------|------------|------------------------------|
+| [`es.clustername`](../metron-elasticsearch#esclustername)                                                           | Indexing      | String     | `es_cluster_name`            |
+| [`es.ip`](../metron-elasticsearch#esip)                                                                             | Indexing      | String     | `es_hosts`                   |
+| [`es.port`](../metron-elasticsearch#esport)                                                                         | Indexing      | String     | `es_port`                    |
+| [`es.date.format`](../metron-elasticsearch#esdateformat)                                                            | Indexing      | String     | `es_date_format`             |
+| [`es.client.settings`](../metron-elasticsearch#esclientsettings)                                                    | Indexing      | Object     |  N/A                         |
+| [`fieldValidations`](#validation-framework)                                                                         | Parsing       | Object     |  N/A                         |
+| [`parser.error.topic`](../metron-parsers#parsererrortopic)                                                          | Parsing       | String     |  N/A                         |
+| [`stellar.function.paths`](../../metron-stellar/stellar-common#stellarfunctionpaths)                                | Stellar       | CSV String |  N/A                         |
+| [`stellar.function.resolver.includes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar       | CSV String |  N/A                         |
+| [`stellar.function.resolver.excludes`](../../metron-stellar/stellar-common#stellarfunctionresolverincludesexcludes) | Stellar       | CSV String |  N/A                         |
+| [`profiler.period.duration`](../../metron-analytics/metron-profiler#profilerperiodduration)                         | Profiler      | Integer    | `profiler_period_duration`   |
+| [`profiler.period.duration.units`](../../metron-analytics/metron-profiler#profilerperioddurationunits)              | Profiler      | String     | `profiler_period_units`      |
+| [`profiler.writer.batchSize`](../../metron-analytics/metron-profiler/#profilerwriterbatchsize)                      | Profiler      | Integer    |  N/A                         |
+| [`profiler.writer.batchTimeout`](../../metron-analytics/metron-profiler/#profilerwriterbatchtimeout)                | Profiler      | Integer    |  N/A                         |
+| [`update.hbase.table`](../metron-indexing#updatehbasetable)                                                         | REST/Indexing | String     | `update_hbase_table`         |
+| [`update.hbase.cf`](../metron-indexing#updatehbasecf)                                                               | REST/Indexing | String     | `update_hbase_cf`            |
+| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile)                                                                 | Enrichment    | String     | `geo_hdfs_file`              |
+| [`enrichment.writer.batchSize`](../metron-enrichment#enrichmentwriterbatchsize)                                     | Enrichment    | Integer    |  N/A                         |
+| [`enrichment.writer.batchTimeout`](../metron-enrichment#enrichmentwriterbatchtimeout)                               | Enrichment    | Integer    |  N/A                         |
+| [`geo.hdfs.file`](../metron-enrichment#geohdfsfile)                                                                 | Enrichment    | String     | `geo_hdfs_file`              |
+| [`source.type.field`](../../metron-interface/metron-alerts#sourcetypefield)                                         | UI            | String     | `source_type_field`          |
+| [`threat.triage.score.field`](../../metron-interface/metron-alerts#threattriagescorefield)                          | UI            | String     | `threat_triage_score_field`  |
 
 ## Note Configs in Ambari
 If a field is managed via ambari, you should change the field via
@@ -439,3 +440,4 @@
   -p DIRECTORY, --hdp_home=DIRECTORY
                         HDP home directory
 ```
+`
diff --git a/metron-platform/metron-common/src/main/config/zookeeper/global.json b/metron-platform/metron-common/src/main/config/zookeeper/global.json
index 9e5402e..b638ca3 100644
--- a/metron-platform/metron-common/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-common/src/main/config/zookeeper/global.json
@@ -6,6 +6,5 @@
   "update.hbase.table": "metron_update",
   "update.hbase.cf": "t",
   "es.client.settings": {
-    "client.transport.ping_timeout": "500s"
   }
 }
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
index 6308f0a..14d5b69 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigOption.java
@@ -30,6 +30,13 @@
     return (s, o) -> o;
   }
 
+  /**
+   * Returns true if the map contains the key for the defined config option
+   */
+  default boolean containsOption(Map<String, Object> map) {
+    return map.containsKey(getKey());
+  }
+
   default void put(Map<String, Object> map, Object value) {
     map.put(getKey(), value);
   }
diff --git a/metron-platform/metron-elasticsearch/README.md b/metron-platform/metron-elasticsearch/README.md
index 177412e..463a0b8 100644
--- a/metron-platform/metron-elasticsearch/README.md
+++ b/metron-platform/metron-elasticsearch/README.md
@@ -59,6 +59,49 @@
 roll hourly, whereas an `es.date.format` of `yyyy.MM.dd` would have the consequence that the indices would
 roll daily.
 
+### `es.client.settings`
+
+This field in global config allows you to specify Elasticsearch REST client options. These are used in conjunction with the previously mentioned Elasticsearch properties
+when setting up client connections to an Elasticsearch cluster. The available properties should be supplied as an object map. Current available options are as follows:
+
+| Property Name                       | Type      | Required? | Default Value  | Description                                                                                                                                                         |
+|-------------------------------------|-----------|-----------|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connection.timeout.millis           | Integer   | No        | 1000           | Sets connection timeout.                                                                                                                                            |
+| socket.timeout.millis               | Integer   | No        | 30000          | Sets socket timeout.                                                                                                                                                |
+| max.retry.timeout.millis            | Integer   | No        | 30000          | Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request.                                                               |
+| num.client.connection.threads       | Integer   | No        | 1              | Number of worker threads used by the connection manager. Defaults to Runtime.getRuntime().availableProcessors().                                                    |
+| xpack.username                      | String    | No        | null           | X-Pack username.                                                                                                                                                    |
+| xpack.password.file                 | String    | No        | null           | 1-line HDFS file where the X-Pack password is set.                                                                                                                  |
+| ssl.enabled                         | Boolean   | No        | false          | Turn on SSL connections.                                                                                                                                            |
+| keystore.type                       | String    | No        | "jks"          | Allows you to specify a keytstore type. See https://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#KeyStore for more details.           |
+| keystore.path                       | String    | No        | null           | Path to the Trust Store that holds your Elasticsearch certificate authorities and certificate.                                                                      |
+| keystore.password.file              | String    | No        | null           | 1-line HDFS file where the keystore password is set.                                                                                                                |
+
+__Note:__ The migration from Elasticsearch's TransportClient to the Java REST client has resulted in some existing properties to change. Below is a mapping of the old properties to the new ones:
+
+| Old Property Name                      | New Property Name                   |
+|----------------------------------------|-------------------------------------|
+| client.transport.ping_timeout          | n/a                                 |
+| n/a                                    | connection.timeout.millis           |
+| n/a                                    | socket.timeout.millis               |
+| n/a                                    | max.retry.timeout.millis            |
+| n/a                                    | num.client.connection.threads       |
+| es.client.class                        | n/a                                 |
+| es.xpack.username                      | xpack.username                      |
+| es.xpack.password.file                 | xpack.password.file                 |
+| xpack.security.transport.ssl.enabled   | ssl.enabled                         |
+| xpack.ssl.key                          | n/a                                 |
+| xpack.ssl.certificate                  | n/a                                 |
+| xpack.ssl.certificate_authorities      | n/a                                 |
+| n/a                                    | keystore.type                       |
+| keystore.path                          | keystore.path                       |
+| n/a                                    | keystore.password.file              |
+
+__Notes:__
+* The transport client implementation provides for a 'xpack.security.user' property, however we never used this property directly. Rather, in order to secure the password we used custom properties for user/pass. These properties have been carried over as `xpack.username` and `xpack.password.file`.
+* See [https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_common_configuration.html](https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_common_configuration.html) for more specifics on the new client properties.
+* Other notes on JSSE - [https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html](https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html)
+
 ## Upgrading to 5.6.2
 
 Users should be prepared to re-index when migrating from Elasticsearch 2.3.3 to 5.6.2. There are a number of template changes, most notably around
@@ -287,7 +330,7 @@
 
 ## Using Metron with Elasticsearch 5.6.2
 
-Although infrequent sometimes an internal field is added in Metron and existing templates must be updated.  The following steps outlines how to do this, using `metron_alert` as an example.
+Although infrequent, sometimes an internal field is added in Metron and existing templates must be updated.  The following steps outlines how to do this, using `metron_alert` as an example.
 
 With the addition of the meta alert feature, there is a requirement that all sensors templates have a nested `metron_alert` field defined.  This field is a dummy field.  See [Ignoring Unmapped Fields](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html#_ignoring_unmapped_fields) for more information
 
diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml
index adc601a..e3cf840 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -73,6 +73,17 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.httpcomponents</groupId>
+            <artifactId>httpcore</artifactId>
+            <version>4.4.9</version>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch.plugin</groupId>
+            <artifactId>transport-netty4-client</artifactId>
+            <version>${global_elasticsearch_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-hbase</artifactId>
             <version>${project.parent.version}</version>
@@ -206,22 +217,6 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-api</artifactId>
-            <version>${global_log4j_core_version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.logging.log4j</groupId>
-            <artifactId>log4j-core</artifactId>
-            <version>${global_log4j_core_version}</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava-testlib</artifactId>
-            <version>${global_guava_version}</version>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -283,7 +278,6 @@
                                 </excludes>
                             </artifactSet>
                             <transformers>
-                                <transformer implementation="org.atteo.classindex.ClassIndexTransformer"/>
                                 <transformer
                                   implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
                                      <resources>
@@ -297,13 +291,15 @@
                                 <!--transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
                                     <addHeader>false</addHeader>
                                     <projectName>${project.name}</projectName>
-                                </transformer-->
+                                </transformer>-->
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                 <transformer
                                         implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                     <mainClass></mainClass>
                                 </transformer>
+                                <!-- ClassIndexTransformer needs to go LAST. For some reason it will clobber other transformers from operating when it is put first -->
+                                <transformer implementation="org.atteo.classindex.ClassIndexTransformer"/>
                             </transformers>
                         </configuration>
                     </execution>
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClient.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClient.java
new file mode 100644
index 0000000..d62a7c0
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClient.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.client;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.StringEntity;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
+import org.apache.metron.elasticsearch.utils.FieldProperties;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+
+/**
+ * Wrapper around the Elasticsearch REST clients. Exposes capabilities of the low and high-level clients.
+ * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-overview.html. Most, if not
+ * all of use in Metron would be focused through the high-level client. It handles marshaling/unmarshaling.
+ */
+public class ElasticsearchClient implements AutoCloseable{
+  private RestClient lowLevelClient;
+  private RestHighLevelClient highLevelClient;
+
+  /**
+   * Instantiate with ElasticsearchClientFactory.
+   *
+   * @param lowLevelClient
+   * @param highLevelClient
+   */
+  public ElasticsearchClient(RestClient lowLevelClient, RestHighLevelClient highLevelClient) {
+    this.lowLevelClient = lowLevelClient;
+    this.highLevelClient = highLevelClient;
+  }
+
+  /**
+   * Exposes an Elasticsearch low-level client. Prefer the high level client.
+   */
+  public RestClient getLowLevelClient() {
+    return lowLevelClient;
+  }
+
+  /**
+   * <p>
+   * Exposes an Elasticsearch high-level client. Prefer to use this client over the low-level client where possible. This client wraps the low-level
+   * client and exposes some additional sugar on top of the low level methods including marshaling/unmarshaling.
+   * </p>
+   * <p>
+   *   Note, as of 5.6.2 it does NOT support index or cluster management operations.
+   *   https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_changing_the_application_8217_s_code.html
+   *   <br>
+   * &nbsp;&nbsp;&nbsp;&nbsp;<i>Does not provide indices or cluster management APIs. Management operations can be executed by external scripts or using the low-level client.</i>
+   * </p>
+   * <p>
+   * Current supported ES API's seen here - https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/java-rest-high-supported-apis.html
+   * </p>
+   *
+   * <ul>
+   *   <li>Single document APIs
+   *     <ul>
+   *       <li>Index API</li>
+   *       <li>Get API</li>
+   *       <li>Delete API</li>
+   *       <li>Update API</li>
+   *     </ul>
+   *   </li>
+   *   <li>Multi document APIs
+   *     <ul>
+   *       <li>Bulk API</li>
+   *     </ul>
+   *   </li>
+   *   <li>Search APIs
+   *     <ul>
+   *       <li>Search API</li>
+   *       <li>Search Scroll API</li>
+   *       <li>Clear Scroll API</li>
+   *     </ul>
+   *   </li>
+   *   <li>Miscellaneous APIs
+   *     <ul>
+   *       <li>Info API</li>
+   *     </ul>
+   *   </li>
+   * </ul>
+   */
+  public RestHighLevelClient getHighLevelClient() {
+    return highLevelClient;
+  }
+
+  /**
+   * Included as part of AutoCloseable because Elasticsearch recommends closing the client when not
+   * being used.
+   * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_changing_the_client_8217_s_initialization_code.html
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (lowLevelClient != null) {
+      lowLevelClient.close();
+    }
+  }
+
+  /**
+   * https://www.elastic.co/guide/en/elasticsearch/reference/5.6/indices-put-mapping.html
+   * @param index
+   * @param mappingType https://www.elastic.co/guide/en/elasticsearch/reference/5.6/mapping.html#mapping-type
+   * @param mapping
+   * @throws IOException
+   */
+  public void putMapping(String index, String mappingType, String mapping) throws IOException {
+    HttpEntity entity = new StringEntity(mapping);
+    Response response = lowLevelClient.performRequest("PUT"
+            , "/" + index + "/_mapping/" + mappingType
+            , Collections.emptyMap()
+            , entity
+    );
+
+    if(response.getStatusLine().getStatusCode() != 200) {
+      String responseStr = IOUtils.toString(response.getEntity().getContent());
+      throw new IllegalStateException("Got a " + response.getStatusLine().getStatusCode() + " due to " + responseStr);
+    }
+  }
+
+  /**
+   * Gets ALL Elasticsearch indices, or null if status code returned is not OK 200.
+   */
+  public String[] getIndices() throws IOException {
+    Response response = lowLevelClient.performRequest("GET", "/_cat/indices");
+    if(response.getStatusLine().getStatusCode() == 200) {
+      String responseStr = IOUtils.toString(response.getEntity().getContent());
+      List<String> indices = new ArrayList<>();
+      for(String line : Splitter.on("\n").split(responseStr)) {
+        Iterable<String> splits = Splitter.on(" ").split(line.replaceAll("\\s+", " ").trim());
+        if(Iterables.size(splits) > 3) {
+          String index = Iterables.get(splits, 2, "");
+          if(!StringUtils.isEmpty(index)) {
+            indices.add(index.trim());
+          }
+        }
+      }
+      String[] ret = new String[indices.size()];
+      ret=indices.toArray(ret);
+      return ret;
+    }
+    return null;
+  }
+
+  /**
+   * Gets FieldMapping detail for a list of indices.
+   *
+   * @param indices get field mapppings for the provided indices
+   * @return mapping of index name to FieldMapping
+   */
+  public Map<String, FieldMapping> getMappingByIndex(String[] indices) throws IOException {
+    Map<String, FieldMapping> ret = new HashMap<>();
+    String indicesCsv = Joiner.on(",").join(indices);
+    Response response = lowLevelClient.performRequest("GET", "/" + indicesCsv + "/_mapping");
+    if(response.getStatusLine().getStatusCode() == 200) {
+      String responseStr = IOUtils.toString(response.getEntity().getContent());
+      Map<String, Object> indexToMapping = JSONUtils.INSTANCE.load(responseStr, JSONUtils.MAP_SUPPLIER);
+      for(Map.Entry<String, Object> index2Mapping : indexToMapping.entrySet()) {
+        String index = index2Mapping.getKey();
+        Map<String, Object> mappings = getInnerMap((Map<String, Object>)index2Mapping.getValue(), "mappings");
+        if(mappings.size() > 0) {
+          Map.Entry<String, Object> docMap = Iterables.getFirst(mappings.entrySet(), null);
+          if(docMap != null) {
+            Map<String, Object> fieldPropertiesMap = getInnerMap((Map<String, Object>)docMap.getValue(), "properties");
+            if(fieldPropertiesMap != null) {
+              FieldMapping mapping = new FieldMapping();
+              for (Map.Entry<String, Object> field2PropsKV : fieldPropertiesMap.entrySet()) {
+                if(field2PropsKV.getValue() != null) {
+                  FieldProperties props = new FieldProperties((Map<String, Object>) field2PropsKV.getValue());
+                  mapping.put(field2PropsKV.getKey(), props);
+                }
+              }
+              ret.put(index, mapping);
+            }
+          }
+        }
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Traverses the outer map to retrieve a leaf map by iteratively calling get(key) using the provided keys in order. e.g.
+   * for an outer map provided as follows:
+   * <pre>
+   * {
+   *   "foo" : {
+   *     "bar" : {
+   *       "baz" : {
+   *         "hello" : "world"
+   *       }
+   *     }
+   *   }
+   * }
+   * </pre>
+   * calling getInnerMap(outerMap, new String[] { "foo", "bar", "baz" }) would return the following:
+   * <pre>
+   * {hello=world}
+   * </pre>
+   * @param outerMap Complex map of nested keys/values
+   * @param keys ordered list of keys to iterate over to grab a leaf mapping.
+   * @return leaf node, or innermost matching node from outerMap if no leaf exists
+   */
+  private Map<String, Object> getInnerMap(Map<String, Object> outerMap, String... keys) {
+    Map<String, Object> ret = outerMap;
+    if(keys.length == 0) {
+      return outerMap;
+    }
+    for(String key : keys) {
+      ret = (Map<String, Object>)ret.get(key);
+      if(ret == null) {
+        return ret;
+      }
+    }
+    return ret;
+  }
+
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java
new file mode 100644
index 0000000..4e0b2fe
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/client/ElasticsearchClientFactory.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.client;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyManagementException;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import javax.net.ssl.SSLContext;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.ssl.SSLContextBuilder;
+import org.apache.http.ssl.SSLContexts;
+import org.apache.metron.elasticsearch.config.ElasticsearchClientConfig;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils.HostnamePort;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Main entry point to create the ES client.
+ */
+public class ElasticsearchClientFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final String ES_SETTINGS_KEY = "es.client.settings"; // es config key in global config
+
+  /**
+   * Creates an Elasticsearch client from settings provided via the global config.
+   *
+   * @return new client
+   */
+  public static ElasticsearchClient create(Map<String, Object> globalConfig) {
+    ElasticsearchClientConfig esClientConfig = new ElasticsearchClientConfig(
+        getEsSettings(globalConfig));
+    HttpHost[] httpHosts = getHttpHosts(globalConfig, esClientConfig.getConnectionScheme());
+    RestClientBuilder builder = RestClient.builder(httpHosts);
+
+    builder.setRequestConfigCallback(reqConfigBuilder -> {
+      // Modifies request config builder with connection and socket timeouts.
+      // https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_timeouts.html
+      reqConfigBuilder.setConnectTimeout(esClientConfig.getConnectTimeoutMillis());
+      reqConfigBuilder.setSocketTimeout(esClientConfig.getSocketTimeoutMillis());
+      return reqConfigBuilder;
+    });
+    builder.setMaxRetryTimeoutMillis(esClientConfig.getMaxRetryTimeoutMillis());
+
+    builder.setHttpClientConfigCallback(clientBuilder -> {
+      clientBuilder.setDefaultIOReactorConfig(getIOReactorConfig(esClientConfig));
+      clientBuilder.setDefaultCredentialsProvider(getCredentialsProvider(esClientConfig));
+      clientBuilder.setSSLContext(getSSLContext(esClientConfig));
+      return clientBuilder;
+    });
+
+    RestClient lowLevelClient = builder.build();
+    RestHighLevelClient client = new RestHighLevelClient(lowLevelClient);
+    return new ElasticsearchClient(lowLevelClient, client);
+  }
+
+  private static Map<String, Object> getEsSettings(Map<String, Object> globalConfig) {
+    return (Map<String, Object>) globalConfig.getOrDefault(ES_SETTINGS_KEY, new HashMap<>());
+  }
+
+  private static HttpHost[] getHttpHosts(Map<String, Object> globalConfiguration, String scheme) {
+    List<HostnamePort> hps = ElasticsearchUtils.getIps(globalConfiguration);
+    HttpHost[] httpHosts = new HttpHost[hps.size()];
+    int i = 0;
+    for (HostnamePort hp : hps) {
+      httpHosts[i++] = new HttpHost(hp.hostname, hp.port, scheme);
+    }
+    return httpHosts;
+  }
+
+  /**
+   * Creates config with setting for num connection threads. Default is ES client default,
+   * which is 1 to num processors per the documentation.
+   * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_number_of_threads.html
+   */
+  private static IOReactorConfig getIOReactorConfig(ElasticsearchClientConfig esClientConfig) {
+    if (esClientConfig.getNumClientConnectionThreads().isPresent()) {
+      Integer numThreads = esClientConfig.getNumClientConnectionThreads().get();
+      LOG.info("Setting number of client connection threads: {}", numThreads);
+      return IOReactorConfig.custom().setIoThreadCount(numThreads).build();
+    } else {
+      return IOReactorConfig.DEFAULT;
+    }
+  }
+
+  private static CredentialsProvider getCredentialsProvider(
+      ElasticsearchClientConfig esClientConfig) {
+    Optional<Entry<String, String>> credentials = esClientConfig.getCredentials();
+    if (credentials.isPresent()) {
+      LOG.info(
+          "Found auth credentials - setting up user/pass authenticated client connection for ES.");
+      final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+      UsernamePasswordCredentials upcredentials = new UsernamePasswordCredentials(
+          credentials.get().getKey(), credentials.get().getValue());
+      credentialsProvider.setCredentials(AuthScope.ANY, upcredentials);
+      return credentialsProvider;
+    } else {
+      LOG.info(
+          "Elasticsearch client credentials not provided. Defaulting to non-authenticated client connection.");
+      return null;
+    }
+  }
+
+  /**
+   * <p>Setup connection encryption details (SSL) if applicable.
+   * If ssl.enabled=true, sets up SSL connection. If enabled, keystore.path is required. User can
+   * also optionally set keystore.password and keystore.type.
+   * https://www.elastic.co/guide/en/elasticsearch/client/java-rest/5.6/_encrypted_communication.html
+   * <p>
+   * <p>Other guidance on the HTTP Component library and configuring SSL connections.
+   * http://www.robinhowlett.com/blog/2016/01/05/everything-you-ever-wanted-to-know-about-ssl-but-were-afraid-to-ask.
+   * <p>
+   * <p>JSSE docs - https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
+   * <p>
+   * <p>Additional guidance for configuring Elasticsearch for SSL can be found here - https://www.elastic.co/guide/en/x-pack/5.6/ssl-tls.html
+   */
+  private static SSLContext getSSLContext(ElasticsearchClientConfig esClientConfig) {
+    if (esClientConfig.isSSLEnabled()) {
+      LOG.info("Configuring client for SSL connection.");
+      if (!esClientConfig.getKeyStorePath().isPresent()) {
+        throw new IllegalStateException("KeyStore path must be provided for SSL connection.");
+      }
+      Optional<String> optKeyStorePass = esClientConfig.getKeyStorePassword();
+      char[] keyStorePass = optKeyStorePass.map(String::toCharArray).orElse(null);
+      KeyStore trustStore = getStore(esClientConfig.getKeyStoreType(),
+          esClientConfig.getKeyStorePath().get(), keyStorePass);
+      try {
+        SSLContextBuilder sslBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
+        return sslBuilder.build();
+      } catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
+        throw new IllegalStateException("Unable to load truststore.", e);
+      }
+    }
+    return null;
+  }
+
+  private static KeyStore getStore(String type, Path path, char[] pass) {
+    KeyStore store;
+    try {
+      store = KeyStore.getInstance(type);
+    } catch (KeyStoreException e) {
+      throw new IllegalStateException("Unable to get keystore type '" + type + "'", e);
+    }
+    try (InputStream is = Files.newInputStream(path)) {
+      store.load(is, pass);
+    } catch (IOException | NoSuchAlgorithmException | CertificateException e) {
+      throw new IllegalStateException("Unable to load keystore from path '" + path + "'", e);
+    }
+    return store;
+  }
+
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java
new file mode 100644
index 0000000..2ca4763
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.config;
+
+import static java.lang.String.format;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.utils.HDFSUtils;
+
+/**
+ * Access configuration options for the ES client.
+ */
+public class ElasticsearchClientConfig extends AbstractMapDecorator<String, Object> {
+
+  private static final Integer THIRTY_SECONDS_IN_MILLIS = 30_000;
+  private static final Integer ONE_SECONDS_IN_MILLIS = 1_000;
+  private static final String DEFAULT_KEYSTORE_TYPE = "JKS";
+
+  /**
+   * Initialize config from provided settings Map.
+   *
+   * @param settings Map of config options from which to initialize.
+   */
+  public ElasticsearchClientConfig(Map<String, Object> settings) {
+    super(settings);
+  }
+
+  /**
+   * @return Connection timeout as specified by user, or default 1s as defined by the ES client.
+   */
+  public Integer getConnectTimeoutMillis() {
+    return ElasticsearchClientOptions.CONNECTION_TIMEOUT_MILLIS
+        .getOrDefault(this, Integer.class, ONE_SECONDS_IN_MILLIS);
+  }
+
+  /**
+   * @return socket timeout specified by user, or default 30s as defined by the ES client.
+   */
+  public Integer getSocketTimeoutMillis() {
+    return ElasticsearchClientOptions.SOCKET_TIMEOUT_MILLIS
+        .getOrDefault(this, Integer.class, THIRTY_SECONDS_IN_MILLIS);
+  }
+
+  /**
+   * @return max retry timeout specified by user, or default 30s as defined by the ES client.
+   */
+  public Integer getMaxRetryTimeoutMillis() {
+    return ElasticsearchClientOptions.MAX_RETRY_TIMEOUT_MILLIS
+        .getOrDefault(this, Integer.class, THIRTY_SECONDS_IN_MILLIS);
+  }
+
+  /**
+   * Elasticsearch X-Pack credentials.
+   *
+   * @return Username, password
+   */
+  public Optional<Map.Entry<String, String>> getCredentials() {
+    if (ElasticsearchClientOptions.XPACK_PASSWORD_FILE.containsOption(this)) {
+      if (!ElasticsearchClientOptions.XPACK_USERNAME.containsOption(this) ||
+          StringUtils.isEmpty(ElasticsearchClientOptions.XPACK_USERNAME.get(this, String.class))) {
+        throw new IllegalArgumentException(
+            "X-pack username is required when password supplied and cannot be empty");
+      }
+      String user = ElasticsearchClientOptions.XPACK_USERNAME.get(this, String.class);
+      String password = getPasswordFromFile(
+          ElasticsearchClientOptions.XPACK_PASSWORD_FILE.get(this, String.class));
+      if (user != null && password != null) {
+        return Optional.of(new AbstractMap.SimpleImmutableEntry<String, String>(user, password));
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * Expects single password on first line.
+   */
+  private static String getPasswordFromFile(String hdfsPath) {
+    List<String> lines = readLines(hdfsPath);
+    if (lines.size() == 0) {
+      throw new IllegalArgumentException(format("No password found in file '%s'", hdfsPath));
+    }
+    return lines.get(0);
+  }
+
+  /**
+   * Read all lines from HDFS file.
+   *
+   * @param hdfsPath path to file
+   * @return lines
+   */
+  private static List<String> readLines(String hdfsPath) {
+    try {
+      return HDFSUtils.readFile(hdfsPath);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          format("Unable to read XPack password file from HDFS location '%s'", hdfsPath), e);
+    }
+  }
+
+  /**
+   * Determines if SSL is enabled from user-supplied config ssl.enabled.
+   */
+  public boolean isSSLEnabled() {
+    return ElasticsearchClientOptions.SSL_ENABLED.getOrDefault(this, Boolean.class, false);
+  }
+
+  /**
+   * http by default, https if ssl is enabled.
+   */
+  public String getConnectionScheme() {
+    return isSSLEnabled() ? "https" : "http";
+  }
+
+  /**
+   * @return Number of threads to use for client connection.
+   */
+  public Optional<Integer> getNumClientConnectionThreads() {
+    if (ElasticsearchClientOptions.NUM_CLIENT_CONNECTION_THREADS.containsOption(this)) {
+      return Optional
+          .of(ElasticsearchClientOptions.NUM_CLIENT_CONNECTION_THREADS.get(this, Integer.class));
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * @return User-defined keystore type. Defaults to "JKS" if not defined.
+   */
+  public String getKeyStoreType() {
+    if (ElasticsearchClientOptions.KEYSTORE_TYPE.containsOption(this)
+        && StringUtils
+        .isNotEmpty(ElasticsearchClientOptions.KEYSTORE_TYPE.get(this, String.class))) {
+      return ElasticsearchClientOptions.KEYSTORE_TYPE.get(this, String.class);
+    }
+    return DEFAULT_KEYSTORE_TYPE;
+  }
+
+  /**
+   * Reads keystore password from the HDFS file defined by setting "keystore.password.file", if it
+   * exists.
+   *
+   * @return password if it exists, empty optional otherwise.
+   */
+  public Optional<String> getKeyStorePassword() {
+    if (ElasticsearchClientOptions.KEYSTORE_PASSWORD_FILE.containsOption(this)) {
+      String password = getPasswordFromFile(
+          ElasticsearchClientOptions.KEYSTORE_PASSWORD_FILE.get(this, String.class));
+      if (StringUtils.isNotEmpty(password)) {
+        return Optional.of(password);
+      }
+    }
+    return Optional.empty();
+  }
+
+  /**
+   * @return keystore path.
+   */
+  public Optional<Path> getKeyStorePath() {
+    if (ElasticsearchClientOptions.KEYSTORE_PATH.containsOption(this)) {
+      return Optional.of(Paths.get(ElasticsearchClientOptions.KEYSTORE_PATH.get(this, String.class)));
+    }
+    return Optional.empty();
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java
new file mode 100644
index 0000000..c92a34f
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/config/ElasticsearchClientOptions.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.config;
+
+import org.apache.metron.common.configuration.ConfigOption;
+
+public enum ElasticsearchClientOptions implements ConfigOption {
+  CONNECTION_TIMEOUT_MILLIS("connection.timeout.millis"),
+  SOCKET_TIMEOUT_MILLIS("socket.timeout.millis"),
+  MAX_RETRY_TIMEOUT_MILLIS("max.retry.timeout.millis"),
+  NUM_CLIENT_CONNECTION_THREADS("num.client.connection.threads"),
+  // authentication
+  XPACK_USERNAME("xpack.username"),
+  XPACK_PASSWORD_FILE("xpack.password.file"),
+  // security/encryption
+  SSL_ENABLED("ssl.enabled"),
+  KEYSTORE_TYPE("keystore.type"),
+  KEYSTORE_PATH("keystore.path"),
+  KEYSTORE_PASSWORD_FILE("keystore.password.file");
+
+  private final String key;
+
+  ElasticsearchClientOptions(String key) {
+    this.key = key;
+  }
+
+  @Override
+  public String getKey() {
+    return key;
+  }
+
+  /**
+   * Convenience method for printing all options as their key representation.
+   */
+  public static void printOptions() {
+    String newLine = "";
+    for (ElasticsearchClientOptions opt : ElasticsearchClientOptions.values()) {
+      System.out.print(newLine);
+      System.out.print(opt.getKey());
+      newLine = System.lineSeparator();
+    }
+  }
+
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
index 6a8cad8..cb44694 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -18,26 +18,23 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import org.apache.metron.indexing.dao.ColumnMetadataDao;
-import org.apache.metron.indexing.dao.search.FieldType;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
 
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
-import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
+import org.apache.metron.elasticsearch.utils.FieldProperties;
+import org.apache.metron.indexing.dao.ColumnMetadataDao;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Responsible for retrieving column-level metadata for Elasticsearch search indices.
@@ -61,16 +58,13 @@
     elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
   }
 
-  /**
-   * An Elasticsearch administrative client.
-   */
-  private transient AdminClient adminClient;
+  private transient ElasticsearchClient esClient;
 
   /**
-   * @param adminClient The Elasticsearch admin client.
+   * @param esClient The Elasticsearch client.
    */
-  public ElasticsearchColumnMetadataDao(AdminClient adminClient) {
-    this.adminClient = adminClient;
+  public ElasticsearchColumnMetadataDao(ElasticsearchClient esClient) {
+    this.esClient = esClient;
   }
 
   @SuppressWarnings("unchecked")
@@ -82,51 +76,40 @@
 
     String[] latestIndices = getLatestIndices(indices);
     if (latestIndices.length > 0) {
-      ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = adminClient
-              .indices()
-              .getMappings(new GetMappingsRequest().indices(latestIndices))
-              .actionGet()
-              .getMappings();
+
+     Map<String, FieldMapping>  mappings = esClient.getMappingByIndex(latestIndices);
 
       // for each index
-      for (Object key : mappings.keys().toArray()) {
-        String indexName = key.toString();
-        ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName);
+      for (Map.Entry<String, FieldMapping> kv : mappings.entrySet()) {
+        String indexName = kv.getKey();
+        FieldMapping mapping = kv.getValue();
 
         // for each mapping in the index
-        Iterator<String> mappingIterator = mapping.keysIt();
-        while (mappingIterator.hasNext()) {
-          MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
-          Map<String, Object> sourceAsMap = mappingMetaData.getSourceAsMap();
-          if (sourceAsMap.containsKey("properties")) {
-            Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) sourceAsMap.get("properties");
+        for(Map.Entry<String, FieldProperties> fieldToProperties : mapping.entrySet()) {
+          String field = fieldToProperties.getKey();
+          FieldProperties properties = fieldToProperties.getValue();
+          if (!fieldBlackList.contains(field)) {
+            FieldType type = toFieldType((String) properties.get("type"));
 
-            // for each field in the mapping
-            for (String field : map.keySet()) {
-              if (!fieldBlackList.contains(field)) {
-                FieldType type = toFieldType(map.get(field).get("type"));
+            if(!indexColumnMetadata.containsKey(field)) {
+              indexColumnMetadata.put(field, type);
 
-                if(!indexColumnMetadata.containsKey(field)) {
-                  indexColumnMetadata.put(field, type);
+              // record the last index in which a field exists, to be able to print helpful error message on type mismatch
+              previousIndices.put(field, indexName);
 
-                  // record the last index in which a field exists, to be able to print helpful error message on type mismatch
-                  previousIndices.put(field, indexName);
-
-                } else {
-                  FieldType previousType = indexColumnMetadata.get(field);
-                  if (!type.equals(previousType)) {
-                    String previousIndexName = previousIndices.get(field);
-                    LOG.error(String.format(
+            } else {
+              FieldType previousType = indexColumnMetadata.get(field);
+              if (!type.equals(previousType)) {
+                String previousIndexName = previousIndices.get(field);
+                LOG.error(String.format(
                         "Field type mismatch: %s.%s has type %s while %s.%s has type %s.  Defaulting type to %s.",
                         indexName, field, type.getFieldType(),
                         previousIndexName, field, previousType.getFieldType(),
                         FieldType.OTHER.getFieldType()));
-                    indexColumnMetadata.put(field, FieldType.OTHER);
+                indexColumnMetadata.put(field, FieldType.OTHER);
 
-                    // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER
-                    fieldBlackList.add(field);
-                  }
-                }
+                // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER
+                fieldBlackList.add(field);
               }
             }
           }
@@ -166,15 +149,11 @@
    * @param includeIndices The base names of the indices to include
    * @return The latest version of a set of indices.
    */
-  String[] getLatestIndices(List<String> includeIndices) {
+  String[] getLatestIndices(List<String> includeIndices) throws IOException {
     LOG.debug("Getting latest indices; indices={}", includeIndices);
     Map<String, String> latestIndices = new HashMap<>();
-    String[] indices = adminClient
-            .indices()
-            .prepareGetIndex()
-            .setFeatures()
-            .get()
-            .getIndices();
+
+    String[] indices = esClient.getIndices();
 
     for (String index : indices) {
       int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 9f6e1a1..210e1ce 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -22,7 +22,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.RetrieveLatestDao;
@@ -38,7 +39,6 @@
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.apache.metron.indexing.dao.update.ReplaceRequest;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +47,7 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient TransportClient client;
+  private transient ElasticsearchClient client;
   private ElasticsearchSearchDao searchDao;
   private ElasticsearchUpdateDao updateDao;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
@@ -64,7 +64,7 @@
 
   private AccessConfig accessConfig;
 
-  protected ElasticsearchDao(TransportClient client,
+  protected ElasticsearchDao(ElasticsearchClient client,
       AccessConfig config,
       ElasticsearchSearchDao searchDao,
       ElasticsearchUpdateDao updateDao,
@@ -96,10 +96,9 @@
   @Override
   public synchronized void init(AccessConfig config) {
     if (this.client == null) {
-      this.client = ElasticsearchUtils
-          .getClient(config.getGlobalConfigSupplier().get());
+      this.client = ElasticsearchClientFactory.create(config.getGlobalConfigSupplier().get());
       this.accessConfig = config;
-      this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin());
+      this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client);
       this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
       this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao,
           requestSubmitter);
@@ -127,13 +126,13 @@
   }
 
   @Override
-  public Document getLatest(final String guid, final String sensorType) {
+  public Document getLatest(final String guid, final String sensorType) throws IOException {
     return retrieveLatestDao.getLatest(guid, sensorType);
   }
 
   @Override
   public Iterable<Document> getAllLatest(
-      final List<GetRequest> getRequests) {
+      final List<GetRequest> getRequests) throws IOException {
     return retrieveLatestDao.getAllLatest(getRequests);
   }
 
@@ -188,7 +187,7 @@
     return this.updateDao.removeCommentFromAlert(request, latest);
   }
 
-  protected Optional<String> getIndexName(String guid, String sensorType) {
+  protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
     return updateDao.getIndexName(guid, sensorType);
   }
 
@@ -202,7 +201,7 @@
     return searchDao.group(groupRequest, queryBuilder);
   }
 
-  public TransportClient getClient() {
+  public ElasticsearchClient getClient() {
     return this.client;
   }
 }
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index fc0b20c..ac5417e 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -176,7 +176,7 @@
   }
 
   @Override
-  public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
+  public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException {
     return metaAlertSearchDao.getAllMetaAlertsForAlert(guid);
   }
 
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
index 00fc9d0..65bfa20 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertSearchDao.java
@@ -41,6 +41,8 @@
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 
+import java.io.IOException;
+
 public class ElasticsearchMetaAlertSearchDao implements MetaAlertSearchDao {
 
   protected ElasticsearchDao elasticsearchDao;
@@ -89,7 +91,7 @@
   }
 
   @Override
-  public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException {
+  public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException {
     if (guid == null || guid.trim().isEmpty()) {
       throw new InvalidSearchException("Guid cannot be empty");
     }
@@ -104,7 +106,7 @@
             ).innerHit(new InnerHitBuilder())
         )
         .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
-    return queryAllResults(elasticsearchDao.getClient(), qb, config.getMetaAlertIndex(),
+    return queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), qb, config.getMetaAlertIndex(),
         pageSize);
   }
 }
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
index 3b67891..2e9c855 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertUpdateDao.java
@@ -199,7 +199,7 @@
    * @param alertGuid The GUID of the child alert
    * @return The Elasticsearch response containing the meta alerts
    */
-  protected SearchResponse getMetaAlertsForAlert(String alertGuid) {
+  protected SearchResponse getMetaAlertsForAlert(String alertGuid) throws IOException {
     QueryBuilder qb = boolQuery()
         .must(
             nestedQuery(
@@ -212,7 +212,7 @@
         )
         .must(termQuery(MetaAlertConstants.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()));
     return ElasticsearchUtils
-        .queryAllResults(elasticsearchDao.getClient(), qb, getConfig().getMetaAlertIndex(),
+        .queryAllResults(elasticsearchDao.getClient().getHighLevelClient(), qb, getConfig().getMetaAlertIndex(),
             pageSize);
   }
 
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
index 0e0df21..c63532e 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -20,13 +20,12 @@
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.rest.RestStatus;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,9 +42,9 @@
   /**
    * The Elasticsearch client.
    */
-  private TransportClient client;
+  private ElasticsearchClient client;
 
-  public ElasticsearchRequestSubmitter(TransportClient client) {
+  public ElasticsearchRequestSubmitter(ElasticsearchClient client) {
     this.client = client;
   }
 
@@ -60,12 +59,10 @@
     // submit the search request
     org.elasticsearch.action.search.SearchResponse esResponse;
     try {
-      esResponse = client
-              .search(request)
-              .actionGet();
+      esResponse = client.getHighLevelClient().search(request);
       LOG.debug("Got Elasticsearch response; response={}", esResponse.toString());
 
-    } catch (SearchPhaseExecutionException e) {
+    } catch (Exception e) {
       String msg = String.format(
               "Failed to execute search; error='%s', search='%s'",
               ExceptionUtils.getRootCauseMessage(e),
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
index f6bfeda..0c91007 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRetrieveLatestDao.java
@@ -28,33 +28,35 @@
 import java.util.List;
 import java.util.Optional;
 import java.util.function.Function;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.indexing.dao.RetrieveLatestDao;
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 
 public class ElasticsearchRetrieveLatestDao implements RetrieveLatestDao {
 
-  private TransportClient transportClient;
+  private ElasticsearchClient transportClient;
 
-  public ElasticsearchRetrieveLatestDao(TransportClient transportClient) {
+  public ElasticsearchRetrieveLatestDao(ElasticsearchClient transportClient) {
     this.transportClient = transportClient;
   }
 
   @Override
-  public Document getLatest(String guid, String sensorType) {
+  public Document getLatest(String guid, String sensorType) throws IOException {
     Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit));
     return doc.orElse(null);
   }
 
   @Override
-  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) {
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException {
     Collection<String> guids = new HashSet<>();
     Collection<String> sensorTypes = new HashSet<>();
     for (GetRequest getRequest : getRequests) {
@@ -80,7 +82,7 @@
   }
 
   <T> Optional<T> searchByGuid(String guid, String sensorType,
-      Function<SearchHit, Optional<T>> callback) {
+      Function<SearchHit, Optional<T>> callback) throws IOException {
     Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null;
     List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback);
     if (results.size() > 0) {
@@ -96,7 +98,7 @@
    * If more than one hit happens, the first one will be returned.
    */
   <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes,
-      Function<SearchHit, Optional<T>> callback) {
+      Function<SearchHit, Optional<T>> callback) throws IOException {
     if (guids == null || guids.isEmpty()) {
       return Collections.emptyList();
     }
@@ -113,11 +115,13 @@
     for (String guid : guids) {
       query = idsQuery.addIds(guid);
     }
+    SearchRequest request = new SearchRequest();
+    SearchSourceBuilder builder = new SearchSourceBuilder();
+    builder.query(query);
+    builder.size(guids.size());
+    request.source(builder);
 
-    SearchRequestBuilder request = transportClient.prepareSearch()
-        .setQuery(query)
-        .setSize(guids.size());
-    org.elasticsearch.action.search.SearchResponse response = request.get();
+    org.elasticsearch.action.search.SearchResponse response = transportClient.getHighLevelClient().search(request);
     SearchHits hits = response.getHits();
     List<T> results = new ArrayList<>();
     for (SearchHit hit : hits) {
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
index 5cd0a4d..0b87e56 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
@@ -19,24 +19,19 @@
 
 import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Function;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.FieldType;
-import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.Group;
 import org.apache.metron.indexing.dao.search.GroupOrder;
 import org.apache.metron.indexing.dao.search.GroupOrderType;
@@ -50,16 +45,10 @@
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
-import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.index.mapper.LegacyIpFieldMapper;
-import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregation;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.Aggregations;
@@ -88,12 +77,12 @@
    */
   private static final String SORT_MISSING_FIRST = "_first";
 
-  private transient TransportClient client;
+  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchColumnMetadataDao columnMetadataDao;
   private ElasticsearchRequestSubmitter requestSubmitter;
 
-  public ElasticsearchSearchDao(TransportClient client,
+  public ElasticsearchSearchDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchColumnMetadataDao columnMetadataDao,
       ElasticsearchRequestSubmitter requestSubmitter) {
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index 6843ac7..c769b2f 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -28,18 +28,19 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.AlertComment;
 import org.apache.metron.indexing.dao.update.CommentAddRemoveRequest;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
-import org.elasticsearch.client.transport.TransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,11 +48,11 @@
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  private transient TransportClient client;
+  private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
 
-  public ElasticsearchUpdateDao(TransportClient client,
+  public ElasticsearchUpdateDao(ElasticsearchClient client,
       AccessConfig accessConfig,
       ElasticsearchRetrieveLatestDao searchDao) {
     this.client = client;
@@ -68,7 +69,7 @@
 
     IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName);
     try {
-      IndexResponse response = client.index(indexRequest).get();
+      IndexResponse response = client.getHighLevelClient().index(indexRequest);
 
       ShardInfo shardInfo = response.getShardInfo();
       int failed = shardInfo.getFailed();
@@ -87,7 +88,7 @@
     String indexPostfix = ElasticsearchUtils
         .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date());
 
-    BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+    BulkRequest bulkRequestBuilder = new BulkRequest();
 
     // Get the indices we'll actually be using for each Document.
     for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) {
@@ -103,7 +104,7 @@
       bulkRequestBuilder.add(indexRequest);
     }
 
-    BulkResponse bulkResponse = bulkRequestBuilder.get();
+    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequestBuilder);
     if (bulkResponse.hasFailures()) {
       LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage());
       throw new IOException(
@@ -181,13 +182,13 @@
     return update(newVersion, Optional.empty());
   }
 
-  protected String getIndexName(Document update, Optional<String> index, String indexPostFix) {
+  protected String getIndexName(Document update, Optional<String> index, String indexPostFix) throws IOException {
     return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
         .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null))
     );
   }
 
-  protected Optional<String> getIndexName(String guid, String sensorType) {
+  protected Optional<String> getIndexName(String guid, String sensorType) throws IOException {
     return retrieveLatestDao.searchByGuid(guid,
         sensorType,
         hit -> Optional.ofNullable(hit.getIndex())
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 98dc66d..47cbd98 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -17,55 +17,35 @@
  */
 package org.apache.metron.elasticsearch.utils;
 
-import static java.lang.String.format;
-
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import java.io.IOException;
 import java.lang.invoke.MethodHandles;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
-import org.apache.commons.lang.StringUtils;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.apache.metron.common.utils.HDFSUtils;
-import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
-import org.apache.metron.netty.utils.NettyRuntimeWrapper;
-import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ElasticsearchUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-  private static final String ES_CLIENT_CLASS_DEFAULT = "org.elasticsearch.transport.client.PreBuiltTransportClient";
-  private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file";
-  private static final String USERNAME_CONFIG_KEY = "es.xpack.username";
-  private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user";
-
-
   private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
           = ThreadLocal.withInitial(() -> new HashMap<>());
 
@@ -79,10 +59,6 @@
    */
   public static final String INDEX_NAME_DELIMITER = "_index";
 
-  public static SimpleDateFormat getIndexFormat(WriterConfiguration configurations) {
-    return getIndexFormat(configurations.getGlobalConfig());
-  }
-
   public static SimpleDateFormat getIndexFormat(Map<String, Object> globalConfig) {
     String format = (String) globalConfig.get("es.date.format");
     return DATE_FORMAT_CACHE.get().computeIfAbsent(format, SimpleDateFormat::new);
@@ -103,135 +79,16 @@
     return indexName;
   }
 
-  /**
-   * Extracts the base index name from a full index name.
-   *
-   * For example, given an index named 'bro_index_2017.01.01.01', the base
-   * index name is 'bro'.
-   *
-   * @param indexName The full index name including delimiter and date postfix.
-   * @return The base index name.
-   */
-  public static String getBaseIndexName(String indexName) {
-
-    String[] parts = indexName.split(INDEX_NAME_DELIMITER);
-    if(parts.length < 1 || StringUtils.isEmpty(parts[0])) {
-      String msg = format("Unexpected index name; index=%s, delimiter=%s", indexName, INDEX_NAME_DELIMITER);
-      throw new IllegalStateException(msg);
-    }
-
-    return parts[0];
-  }
-
-  /**
-   * Instantiates an Elasticsearch client based on es.client.class, if set. Defaults to
-   * org.elasticsearch.transport.client.PreBuiltTransportClient.
-   *
-   * @param globalConfiguration Metron global config
-   * @return
-   */
-  public static TransportClient getClient(Map<String, Object> globalConfiguration) {
-    Set<String> customESSettings = new HashSet<>();
-    customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY));
-    Settings.Builder settingsBuilder = Settings.builder();
-    Map<String, String> esSettings = getEsSettings(globalConfiguration);
-    for (Map.Entry<String, String> entry : esSettings.entrySet()) {
-      String key = entry.getKey();
-      String value = entry.getValue();
-      if (!customESSettings.contains(key)) {
-        settingsBuilder.put(key, value);
-      }
-    }
-    settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername"));
-    settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s"));
-    setXPackSecurityOrNone(settingsBuilder, esSettings);
-
-    try {
-      LOG.info("Number of available processors in Netty: {}", NettyRuntimeWrapper.availableProcessors());
-      // Netty sets available processors statically and if an attempt is made to set it more than
-      // once an IllegalStateException is thrown by NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
-      // https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082
-      // https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036
-      System.setProperty("es.set.netty.runtime.available.processors", "false");
-      TransportClient client = createTransportClient(settingsBuilder.build(), esSettings);
-      for (HostnamePort hp : getIps(globalConfiguration)) {
-        client.addTransportAddress(
-                new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
-        );
-      }
-      return client;
-    } catch (UnknownHostException exception) {
-      throw new RuntimeException(exception);
-    }
-  }
-
-  private static Map<String, String> getEsSettings(Map<String, Object> config) {
-    return ConversionUtils
-        .convertMap((Map<String, Object>) config.getOrDefault("es.client.settings", new HashMap<String, Object>()),
-            String.class);
-  }
-
-  /*
-   * Append Xpack security settings (if any)
-   */
-  private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map<String, String> esSettings) {
-
-    if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) {
-
-      if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) {
-        throw new IllegalArgumentException("X-pack username is required and cannot be empty");
-      }
-
-      settingsBuilder.put(
-         TRANSPORT_CLIENT_USER_KEY,
-         esSettings.get(USERNAME_CONFIG_KEY) + ":" + getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))
-      );
-    }
-  }
-
-  /*
-   * Single password on first line
-   */
-  private static String getPasswordFromFile(String hdfsPath) {
-    List<String> lines = null;
-    try {
-      lines = HDFSUtils.readFile(hdfsPath);
-    } catch (IOException e) {
-      throw new IllegalArgumentException(
-          format("Unable to read XPack password file from HDFS location '%s'", hdfsPath), e);
-    }
-    if (lines.size() == 0) {
-      throw new IllegalArgumentException(format("No password found in file '%s'", hdfsPath));
-    }
-    return lines.get(0);
-  }
-
-  /**
-   * Constructs ES transport client from the provided ES settings additional es config
-   *
-   * @param settings client settings
-   * @param esSettings client type to instantiate
-   * @return client with provided settings
-   */
-  private static TransportClient createTransportClient(Settings settings,
-      Map<String, String> esSettings) {
-    String esClientClassName = (String) esSettings
-        .getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT);
-    return ReflectionUtils
-        .createInstance(esClientClassName, new Class[]{Settings.class, Class[].class},
-            new Object[]{settings, new Class[0]});
-  }
-
   public static class HostnamePort {
-    String hostname;
-    Integer port;
+    public String hostname;
+    public Integer port;
     public HostnamePort(String hostname, Integer port) {
       this.hostname = hostname;
       this.port = port;
     }
   }
 
-  protected static List<HostnamePort> getIps(Map<String, Object> globalConfiguration) {
+  public static List<HostnamePort> getIps(Map<String, Object> globalConfiguration) {
     Object ipObj = globalConfiguration.get("es.ip");
     Object portObj = globalConfiguration.get("es.port");
     if(ipObj == null) {
@@ -335,30 +192,29 @@
    * @param qb A QueryBuilder that provides the query to be run.
    * @return A SearchResponse containing the appropriate results.
    */
-  public static  SearchResponse queryAllResults(TransportClient transportClient,
+  public static  SearchResponse queryAllResults(RestHighLevelClient transportClient,
       QueryBuilder qb,
       String index,
       int pageSize
-  ) {
-    SearchRequestBuilder searchRequestBuilder = transportClient
-        .prepareSearch(index)
-        .addStoredField("*")
-        .setFetchSource(true)
-        .setQuery(qb)
-        .setSize(pageSize);
-    org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder
-        .execute()
-        .actionGet();
+  ) throws IOException {
+    org.elasticsearch.action.search.SearchRequest request = new org.elasticsearch.action.search.SearchRequest();
+    SearchSourceBuilder builder = new SearchSourceBuilder();
+    builder.query(qb);
+    builder.size(pageSize);
+    builder.fetchSource(true);
+    builder.storedField("*");
+    request.source(builder);
+    request.indices(index);
+
+    org.elasticsearch.action.search.SearchResponse esResponse = transportClient.search(request);
     List<SearchResult> allResults = getSearchResults(esResponse);
     long total = esResponse.getHits().getTotalHits();
     if (total > pageSize) {
       int pages = (int) (total / pageSize) + 1;
       for (int i = 1; i < pages; i++) {
         int from = i * pageSize;
-        searchRequestBuilder.setFrom(from);
-        esResponse = searchRequestBuilder
-            .execute()
-            .actionGet();
+        builder.from(from);
+        esResponse = transportClient.search(request);
         allResults.addAll(getSearchResults(esResponse));
       }
     }
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
new file mode 100644
index 0000000..15bcb4c
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldMapping.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.utils;
+
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+
+import java.util.HashMap;
+
+/**
+ * Typedef that maps Elasticsearch index name to properties.
+ */
+public class FieldMapping extends AbstractMapDecorator<String, FieldProperties>{
+  public FieldMapping() {
+    super(new HashMap<String, FieldProperties>());
+  }
+
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
new file mode 100644
index 0000000..d116b40
--- /dev/null
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/FieldProperties.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.utils;
+
+import org.apache.commons.collections4.map.AbstractMapDecorator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Typedef that maps Elasticsearch field names to types.
+ */
+public class FieldProperties extends AbstractMapDecorator<String, Object> {
+  public FieldProperties() {
+    super(new HashMap<>());
+  }
+
+  public FieldProperties(Map<String, Object> m) {
+    super(m);
+  }
+}
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 4b8dd08..fbdd4fe 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -23,14 +23,15 @@
 import org.apache.metron.common.field.FieldNameConverters;
 import org.apache.metron.common.writer.BulkMessageWriter;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
 import org.elasticsearch.action.bulk.BulkItemResponse;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.action.index.IndexRequest;
 import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +54,7 @@
   /**
    * The Elasticsearch client.
    */
-  private transient TransportClient client;
+  private transient ElasticsearchClient client;
 
   /**
    * A simple data formatter used to build the appropriate Elasticsearch index name.
@@ -65,7 +66,7 @@
   public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
 
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
-    client = ElasticsearchUtils.getClient(globalConfiguration);
+    client = ElasticsearchClientFactory.create(globalConfiguration);
     dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
   }
 
@@ -76,7 +77,7 @@
     FieldNameConverter fieldNameConverter = FieldNameConverters.create(sensorType, configurations);
 
     final String indexPostfix = dateFormat.format(new Date());
-    BulkRequestBuilder bulkRequest = client.prepareBulk();
+    BulkRequest bulkRequest = new BulkRequest();
     for(JSONObject message: messages) {
 
       JSONObject esDoc = new JSONObject();
@@ -85,22 +86,21 @@
       }
 
       String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, configurations);
-      IndexRequestBuilder indexRequestBuilder = client.prepareIndex(indexName, sensorType + "_doc");
-      indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString());
+      IndexRequest indexRequest = new IndexRequest(indexName, sensorType + "_doc");
+      indexRequest.source(esDoc.toJSONString());
       String guid = (String)esDoc.get(Constants.GUID);
       if(guid != null) {
-        indexRequestBuilder.setId(guid);
+        indexRequest.id(guid);
       }
 
       Object ts = esDoc.get("timestamp");
       if(ts != null) {
-        indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
+        indexRequest.timestamp(ts.toString());
       }
-
-      bulkRequest.add(indexRequestBuilder);
+      bulkRequest.add(indexRequest);
     }
 
-    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+    BulkResponse bulkResponse = client.getHighLevelClient().bulk(bulkRequest);
     return buildWriteReponse(tuples, bulkResponse);
   }
 
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
index 0a83ee0..c9389c0 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
@@ -18,24 +18,22 @@
 
 package org.apache.metron.elasticsearch.dao;
 
-import org.elasticsearch.action.ActionFuture;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.utils.FieldMapping;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Tests the ElasticsearchColumnMetadata class.
@@ -47,7 +45,7 @@
    * @return An object to test.
    */
   public ElasticsearchColumnMetadataDao setup(String[] indices) {
-    return setup(indices, ImmutableOpenMap.of());
+    return setup(indices, new HashMap<>());
   }
 
   /**
@@ -57,32 +55,23 @@
    */
   public ElasticsearchColumnMetadataDao setup(
           String[] indices,
-          ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings) {
+          Map<String, FieldMapping> mappings) {
+    ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), mock(RestHighLevelClient.class)) {
+      @Override
+      public String[] getIndices() throws IOException {
+        return indices;
+      }
 
-    AdminClient adminClient = mock(AdminClient.class);
-    IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
-    GetIndexRequestBuilder getIndexRequestBuilder = mock(GetIndexRequestBuilder.class);
-    GetIndexResponse getIndexResponse = mock(GetIndexResponse.class);
-    ActionFuture getMappingsActionFuture = mock(ActionFuture.class);
-    GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
-
-    // setup the mocks so that a set of indices are available to the DAO
-    when(adminClient.indices()).thenReturn(indicesAdminClient);
-    when(indicesAdminClient.prepareGetIndex()).thenReturn(getIndexRequestBuilder);
-    when(getIndexRequestBuilder.setFeatures()).thenReturn(getIndexRequestBuilder);
-    when(getIndexRequestBuilder.get()).thenReturn(getIndexResponse);
-    when(getIndexResponse.getIndices()).thenReturn(indices);
-
-    // setup the mocks so that a set of mappings are available to the DAO
-    when(indicesAdminClient.getMappings(any())).thenReturn(getMappingsActionFuture);
-    when(getMappingsActionFuture.actionGet()).thenReturn(getMappingsResponse);
-    when(getMappingsResponse.getMappings()).thenReturn(mappings);
-
-    return new ElasticsearchColumnMetadataDao(adminClient);
+      @Override
+      public Map<String, FieldMapping> getMappingByIndex(String[] indices) throws IOException {
+        return mappings;
+      }
+    };
+    return new ElasticsearchColumnMetadataDao(client);
   }
 
   @Test
-  public void testGetOneLatestIndex() {
+  public void testGetOneLatestIndex() throws IOException {
 
     // setup
     String[] existingIndices = new String[] {
@@ -105,7 +94,7 @@
   }
 
   @Test
-  public void testGetLatestIndices() {
+  public void testGetLatestIndices() throws IOException {
     // setup
     String[] existingIndices = new String[] {
             "bro_index_2017.10.03.19",
@@ -127,7 +116,7 @@
   }
 
   @Test
-  public void testLatestIndicesWhereNoneExist() {
+  public void testLatestIndicesWhereNoneExist() throws IOException {
 
     // setup - there are no existing indices
     String[] existingIndices = new String[] {};
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 6c3c327..6dc01a4 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -29,6 +29,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.search.FieldType;
@@ -37,7 +39,8 @@
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
@@ -90,7 +93,8 @@
     requestSubmitter = mock(ElasticsearchRequestSubmitter.class);
     when(requestSubmitter.submitSearch(any())).thenReturn(response);
 
-    TransportClient client = mock(TransportClient.class);
+    RestHighLevelClient highLevel = mock(RestHighLevelClient.class);
+    ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevel);
 
     // provides configuration
     AccessConfig config = mock(AccessConfig.class);
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
index 07019c3..7a84588 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -18,17 +18,19 @@
 
 package org.apache.metron.elasticsearch.dao;
 
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
-import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchShardTarget;
 import org.junit.Test;
-import org.mockito.Mockito;
+
+import java.io.IOException;
 
 import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
@@ -39,21 +41,20 @@
 
   private ElasticsearchRequestSubmitter submitter;
 
-  public ElasticsearchRequestSubmitter setup(SearchResponse response) {
+  public ElasticsearchRequestSubmitter setup(SearchResponse response) throws IOException {
 
     // mocks
-    TransportClient client = mock(TransportClient.class);
-    ActionFuture future = Mockito.mock(ActionFuture.class);
+    RestHighLevelClient highLevelClient = mock(RestHighLevelClient.class);
+    ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevelClient);
 
     // the client should return the given search response
-    when(client.search(any())).thenReturn(future);
-    when(future.actionGet()).thenReturn(response);
+    when(highLevelClient.search(any())).thenReturn(response);
 
     return new ElasticsearchRequestSubmitter(client);
   }
 
   @Test
-  public void searchShouldSucceedWhenOK() throws InvalidSearchException {
+  public void searchShouldSucceedWhenOK() throws InvalidSearchException, IOException {
 
     // mocks
     SearchResponse response = mock(SearchResponse.class);
@@ -71,7 +72,7 @@
   }
 
   @Test(expected = InvalidSearchException.class)
-  public void searchShouldFailWhenNotOK() throws InvalidSearchException {
+  public void searchShouldFailWhenNotOK() throws InvalidSearchException, IOException {
 
     // mocks
     SearchResponse response = mock(SearchResponse.class);
@@ -88,7 +89,7 @@
   }
 
   @Test
-  public void searchShouldHandleShardFailure() throws InvalidSearchException {
+  public void searchShouldHandleShardFailure() throws InvalidSearchException, IOException {
     // mocks
     SearchResponse response = mock(SearchResponse.class);
     SearchRequest request = new SearchRequest();
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
index 3b48a60..3b7f132 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDaoTest.java
@@ -18,30 +18,32 @@
 
 package org.apache.metron.elasticsearch.dao;
 
+import static org.mockito.Mockito.mock;
+
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.UpdateDaoTest;
 import org.apache.metron.indexing.dao.update.UpdateDao;
-import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.junit.Before;
 
-import static org.mockito.Mockito.mock;
-
 /**
  * This class returns the ElasticsearchUpdateDao implementation to be used in UpdateDaoTest.  UpdateDaoTest contains a
  * common set of tests that all Dao implementations must pass.
  */
 public class ElasticsearchUpdateDaoTest extends UpdateDaoTest {
 
-  private TransportClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
   private ElasticsearchUpdateDao updateDao;
 
   @Before
   public void setup() {
-    client = mock(TransportClient.class);
     accessConfig = new AccessConfig();
     retrieveLatestDao = mock(ElasticsearchRetrieveLatestDao.class);
+    RestHighLevelClient highLevel = mock(RestHighLevelClient.class);
+    ElasticsearchClient client = new ElasticsearchClient(mock(RestClient.class), highLevel);
     updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao);
   }
 
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
index c05efc1..03b1639 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
@@ -144,7 +144,7 @@
     Map<String, Object> globalConfig = new HashMap<String, Object>() {
       {
         put("es.clustername", "metron");
-        put("es.port", "9300");
+        put("es.port", "9200");
         put("es.ip", "localhost");
         put("es.date.format", DATE_FORMAT);
       }
@@ -334,11 +334,8 @@
   }
 
   @Override
-  protected void setupTypings() {
-    ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX)
-            .setType("test_doc")
-            .setSource(nestedAlertMapping)
-            .get();
+  protected void setupTypings() throws IOException {
+    ((ElasticsearchDao) esDao).getClient().putMapping(INDEX, "test_doc", nestedAlertMapping);
   }
 
   @Override
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 8187468..d03da0e 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -18,6 +18,10 @@
 package org.apache.metron.elasticsearch.integration;
 
 
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
@@ -25,8 +29,13 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
 import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.indexing.dao.AccessConfig;
@@ -39,10 +48,13 @@
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.integration.InMemoryComponent;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
@@ -57,33 +69,38 @@
   private static String dateFormat = "yyyy.MM.dd.HH";
   private static String broTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template";
   private static String snortTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template";
-  private static final int MAX_RETRIES = 10;
-  private static final int SLEEP_MS = 500;
+  protected static final String BRO_INDEX = "bro_index_2017.01.01.01";
+  protected static final String SNORT_INDEX = "snort_index_2017.01.01.02";
+  protected static Map<String, Object> globalConfig;
+  protected static RestClient lowLevelClient;
+  protected static RestHighLevelClient highLevelClient;
   protected static IndexDao dao;
 
   @BeforeClass
   public static void setup() throws Exception {
     indexComponent = startIndex();
-    dao = createDao();
+    globalConfig = new HashMap<String, Object>() {{
+      put("es.clustername", "metron");
+      put("es.port", "9200");
+      put("es.ip", "localhost");
+      put("es.date.format", dateFormat);
+    }};
+    ElasticsearchClient esClient = ElasticsearchClientFactory.create(globalConfig);
+    lowLevelClient = esClient.getLowLevelClient();
+    highLevelClient = esClient.getHighLevelClient();
+    dao = createDao(globalConfig);
     // The data is all static for searches, so we can set it up beforehand, and it's faster
     loadTestData();
   }
 
-  protected static IndexDao createDao() {
-    AccessConfig config = new AccessConfig();
-    config.setMaxSearchResults(100);
-    config.setMaxSearchGroups(100);
-    config.setGlobalConfigSupplier( () ->
-            new HashMap<String, Object>() {{
-              put("es.clustername", "metron");
-              put("es.port", "9300");
-              put("es.ip", "localhost");
-              put("es.date.format", dateFormat);
-            }}
-    );
+  protected static IndexDao createDao(Map<String, Object> globalConfig) {
+    AccessConfig accessConfig = new AccessConfig();
+    accessConfig.setMaxSearchResults(100);
+    accessConfig.setMaxSearchGroups(100);
+    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
 
     IndexDao dao = new ElasticsearchDao();
-    dao.init(config);
+    dao.init(accessConfig);
     return dao;
   }
 
@@ -97,51 +114,64 @@
   }
 
   protected static void loadTestData() throws ParseException, IOException {
-    ElasticSearchComponent es = (ElasticSearchComponent) indexComponent;
-
+    // add bro template
     JSONObject broTemplate = JSONUtils.INSTANCE.load(new File(broTemplatePath), JSONObject.class);
     addTestFieldMappings(broTemplate, "bro_doc");
-    es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01")
-        .addMapping("bro_doc", JSONUtils.INSTANCE.toJSON(broTemplate.get("mappings"), false)).get();
+    String broTemplateJson = JSONUtils.INSTANCE.toJSON(broTemplate, true);
+    HttpEntity broEntity = new NStringEntity(broTemplateJson, ContentType.APPLICATION_JSON);
+    Response response = lowLevelClient.performRequest("PUT", "/_template/bro_template", Collections.emptyMap(), broEntity);
+    assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+    // add snort template
     JSONObject snortTemplate = JSONUtils.INSTANCE.load(new File(snortTemplatePath), JSONObject.class);
     addTestFieldMappings(snortTemplate, "snort_doc");
-    es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02")
-        .addMapping("snort_doc", JSONUtils.INSTANCE.toJSON(snortTemplate.get("mappings"), false)).get();
+    String snortTemplateJson = JSONUtils.INSTANCE.toJSON(snortTemplate, true);
+    HttpEntity snortEntity = new NStringEntity(snortTemplateJson, ContentType.APPLICATION_JSON);
+    response = lowLevelClient.performRequest("PUT", "/_template/snort_template", Collections.emptyMap(), snortEntity);
+    assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+    // create bro index
+    response = lowLevelClient.performRequest("PUT", BRO_INDEX);
+    assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+    // create snort index
+    response = lowLevelClient.performRequest("PUT", SNORT_INDEX);
+    assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
 
-    BulkRequestBuilder bulkRequest = es.getClient().prepareBulk()
-        .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
-    JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
-    for (Object o : broArray) {
-      JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = es.getClient()
-          .prepareIndex("bro_index_2017.01.01.01", "bro_doc");
-      indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid"));
-      indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString());
-      indexRequestBuilder = indexRequestBuilder
-          .setTimestamp(jsonObject.get("timestamp").toString());
-      bulkRequest.add(indexRequestBuilder);
+    JSONArray broRecords = (JSONArray) new JSONParser().parse(broData);
+
+    BulkRequest bulkRequest = new BulkRequest();
+    for (Object o : broRecords) {
+      JSONObject json = (JSONObject) o;
+      IndexRequest indexRequest = new IndexRequest(BRO_INDEX, "bro_doc", (String) json.get("guid"));
+      indexRequest.source(json);
+      indexRequest.timestamp(json.get("timestamp").toString());
+      bulkRequest.add(indexRequest);
     }
-    JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData);
-    for (Object o : snortArray) {
-      JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = es.getClient()
-          .prepareIndex("snort_index_2017.01.01.02", "snort_doc");
-      indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid"));
-      indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString());
-      indexRequestBuilder = indexRequestBuilder
-          .setTimestamp(jsonObject.get("timestamp").toString());
-      bulkRequest.add(indexRequestBuilder);
+    bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
+    BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest);
+    assertFalse(bulkResponse.hasFailures());
+    assertThat(bulkResponse.status().getStatus(), equalTo(200));
+
+    JSONArray snortRecords = (JSONArray) new JSONParser().parse(snortData);
+
+    bulkRequest = new BulkRequest();
+    for (Object o : snortRecords) {
+      JSONObject json = (JSONObject) o;
+      IndexRequest indexRequest = new IndexRequest(SNORT_INDEX, "snort_doc", (String) json.get("guid"));
+      indexRequest.source(json);
+      indexRequest.timestamp(json.get("timestamp").toString());
+      bulkRequest.add(indexRequest);
     }
-    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
-    if (bulkResponse.hasFailures()) {
-      throw new RuntimeException("Failed to index test data");
-    }
+    bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
+    bulkResponse = highLevelClient.bulk(bulkRequest);
+    assertFalse(bulkResponse.hasFailures());
+    assertThat(bulkResponse.status().getStatus(), equalTo(200));
   }
 
   /**
    * Add test fields to a template with defined types in case they are not defined in the sensor template shipped with Metron.
    * This is useful for testing certain cases, for example faceting on fields of various types.
-   * @param template
+   * Template follows this pattern:
+   * { "mappings" : { "xxx_doc" : { "properties" : { ... }}}}
+   * @param template - this method has side effects - template is modified with field mappings.
    * @param docType
    */
   private static void addTestFieldMappings(JSONObject template, String docType) {
@@ -167,12 +197,11 @@
     dao.search(request);
   }
 
-
-
   @Override
   public void returns_column_metadata_for_specified_indices() throws Exception {
     // getColumnMetadata with only bro
     {
+      Assert.assertEquals(262, dao.getColumnMetadata(Collections.singletonList("bro")).size());
       Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro"));
       Assert.assertEquals(262, fieldTypes.size());
       Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method"));
@@ -187,11 +216,11 @@
       Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
       Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
       Assert.assertEquals(FieldType.TEXT, fieldTypes.get("location_point"));
-      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("ttl"));
       Assert.assertEquals(FieldType.OTHER, fieldTypes.get("metron_alert"));
     }
     // getColumnMetadata with only snort
     {
+      Assert.assertEquals(32, dao.getColumnMetadata(Collections.singletonList("snort")).size());
       Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort"));
       Assert.assertEquals(32, fieldTypes.size());
       Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator"));
@@ -213,6 +242,7 @@
 
   @Override
   public void returns_column_data_for_multiple_indices() throws Exception {
+    Assert.assertEquals(277, dao.getColumnMetadata(Arrays.asList("bro", "snort")).size());
     Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort"));
     Assert.assertEquals(277, fieldTypes.size());
 
@@ -273,9 +303,9 @@
   @Override
   protected String getIndexName(String sensorType) {
     if ("bro".equals(sensorType)) {
-      return "bro_index_2017.01.01.01";
+      return BRO_INDEX;
     } else {
-      return "snort_index_2017.01.01.02";
+      return SNORT_INDEX;
     }
   }
 }
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index c5c0bc1..6f36790 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -104,7 +104,7 @@
   protected static Map<String, Object> createGlobalConfig() {
     return new HashMap<String, Object>() {{
       put("es.clustername", "metron");
-      put("es.port", "9300");
+      put("es.port", "9200");
       put("es.ip", "localhost");
       put("es.date.format", dateFormat);
     }};
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
index 45b4d60..3e14c00 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
@@ -135,7 +135,7 @@
         .put("path.data",dataDir.getAbsolutePath())
         .put("path.home", indexDir.getAbsoluteFile())
         .put("transport.type", "netty4")
-        .put("http.enabled", "false");
+        .put("http.enabled", "true");
 
     if (extraElasticSearchSettings != null) {
       settingsBuilder = settingsBuilder.put(extraElasticSearchSettings);
@@ -277,7 +277,9 @@
   @Override
   public void stop() {
     try {
-      node.close();
+      if(node != null) {
+        node.close();
+      }
     } catch (IOException e) {
       throw new RuntimeException("Unable to stop node." , e);
     }
diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertSearchDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertSearchDao.java
index e8b9f26..cbbe9ee 100644
--- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertSearchDao.java
+++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertSearchDao.java
@@ -22,6 +22,8 @@
 import org.apache.metron.indexing.dao.search.SearchDao;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 
+import java.io.IOException;
+
 public interface MetaAlertSearchDao extends SearchDao {
 
   /**
@@ -30,6 +32,6 @@
    * @return All meta alerts with a child alert having the GUID
    * @throws InvalidSearchException If a problem occurs with the search
    */
-  SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException;
+  SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException, IOException;
 
 }
diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
index 7e28853..24989b4 100644
--- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
+++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
@@ -1095,7 +1095,7 @@
   protected abstract long getMatchingMetaAlertCount(String fieldName, String fieldValue)
       throws IOException, InterruptedException;
 
-  protected abstract void setupTypings();
+  protected abstract void setupTypings() throws IOException;
 
   // Get the base index name without any adjustments (e.g. without ES's "_index")
   protected abstract String getTestIndexName();
diff --git a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
index 9292f72..f7d45a7 100644
--- a/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-integration-test/src/main/config/zookeeper/global.json
@@ -1,7 +1,7 @@
 {
   "es.clustername": "metron",
   "es.ip": "localhost",
-  "es.port": 9300,
+  "es.port": 9200,
   "es.date.format": "yyyy.MM.dd.HH",
 
   "solr.zookeeper": "localhost:2181",