Update mongodb driver version to 3.12.8

Update mongodb driver version to 3.12.8
diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
index 795959d..9005803 100644
--- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
+++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikeMappingBuilder.java
@@ -18,10 +18,11 @@
 
 import com.aerospike.client.policy.GenerationPolicy;
 import com.aerospike.client.policy.Policy;
+import com.aerospike.client.policy.ReadModeAP;
+import com.aerospike.client.policy.ReadModeSC;
 import com.aerospike.client.policy.RecordExistsAction;
 import com.aerospike.client.policy.CommitLevel;
 import com.aerospike.client.policy.Priority;
-import com.aerospike.client.policy.ConsistencyLevel;
 import com.aerospike.client.policy.Replica;
 import com.aerospike.client.policy.WritePolicy;
 import org.jdom.Document;
@@ -118,14 +119,13 @@
           else if (policy.equals(AerospikePolicyConst.READ_POLICY_NAME)) {
 
             Policy readPolicy = new Policy();
-            if (policyElement.getAttributeValue(AerospikePolicyConst.PRIORITY_NAME) != null) {
-              readPolicy.priority = getPriority(
-                      policyElement.getAttributeValue(AerospikePolicyConst.PRIORITY_NAME));
+            if (policyElement.getAttributeValue(AerospikePolicyConst.READ_MODE_AP_NAME) != null) {
+              readPolicy.readModeAP = getReadModeAP(policyElement
+                      .getAttributeValue(AerospikePolicyConst.READ_MODE_AP_NAME));
             }
-            if (policyElement.getAttributeValue(AerospikePolicyConst.CONSISTENCY_LEVEL_NAME)
-                    != null) {
-              readPolicy.consistencyLevel = getConsistencyLevel(
-                      policyElement.getAttributeValue(AerospikePolicyConst.CONSISTENCY_LEVEL_NAME));
+            if (policyElement.getAttributeValue(AerospikePolicyConst.READ_MODE_SC_NAME) != null) {
+              readPolicy.readModeSC = getReadModeSC(policyElement
+                      .getAttributeValue(AerospikePolicyConst.READ_MODE_SC_NAME));
             }
             if (policyElement.getAttributeValue(AerospikePolicyConst.REPLICA_POLICY_NAME) != null) {
               readPolicy.replica = getReplicaPolicy(
@@ -307,24 +307,30 @@
     return Priority.DEFAULT;
   }
 
-  /**
-   * Returns the corresponding consistency level from the user specified consistency level name.
-   * The default value is CONSISTENCY_ONE
-   *
-   * @param consistencyLevel user specified consistency level name
-   * @return corresponding consistency level
-   */
-  private ConsistencyLevel getConsistencyLevel(String consistencyLevel) {
-    if (consistencyLevel == null)
-      return ConsistencyLevel.CONSISTENCY_ONE;
+  private ReadModeAP getReadModeAP(String readModeAP) {
+    if (readModeAP == null)
+      return ReadModeAP.ONE;
 
-    for (ConsistencyLevel consistencyLevelEnum : ConsistencyLevel.values()) {
-      if (consistencyLevel.equalsIgnoreCase(consistencyLevelEnum.toString())) {
-        return consistencyLevelEnum;
+    for (ReadModeAP readModeAPEnum : ReadModeAP.values()) {
+      if (readModeAP.equalsIgnoreCase(readModeAPEnum.toString())) {
+        return readModeAPEnum;
       }
     }
-    LOG.warn("Invalid consistency level provided, using the default consistency level.");
-    return ConsistencyLevel.CONSISTENCY_ONE;
+    LOG.warn("Invalid consistency level provided, using the default ReadModeAP level.");
+    return ReadModeAP.ONE;
+  }
+
+  private ReadModeSC getReadModeSC(String readModeSC) {
+    if (readModeSC == null)
+      return ReadModeSC.SESSION;
+
+    for (ReadModeSC readModeSCEnum : ReadModeSC.values()) {
+      if (readModeSC.equalsIgnoreCase(readModeSCEnum.toString())) {
+        return readModeSCEnum;
+      }
+    }
+    LOG.warn("Invalid consistency level provided, using the default ReadModeSC level.");
+    return ReadModeSC.SESSION;
   }
 
   /**
diff --git a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikePolicyConst.java b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikePolicyConst.java
index eae312e..0a098cb 100644
--- a/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikePolicyConst.java
+++ b/gora-aerospike/src/main/java/org/apache/gora/aerospike/store/AerospikePolicyConst.java
@@ -37,9 +37,9 @@
 
   public static final String READ_POLICY_NAME = "read";
 
-  public static final String PRIORITY_NAME = "priority";
+  public static final String READ_MODE_AP_NAME = "readModeAP";
 
-  public static final String CONSISTENCY_LEVEL_NAME = "consistencyLevel";
+  public static final String READ_MODE_SC_NAME = "readModeSC";
 
   public static final String REPLICA_POLICY_NAME = "replica";
 
diff --git a/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreCountQuery.java b/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreCountQuery.java
index 72a2ba5..f66ce6a 100644
--- a/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreCountQuery.java
+++ b/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreCountQuery.java
@@ -35,7 +35,7 @@
  */
 public class TestAerospikeStoreCountQuery {
 
-  private static final String DOCKER_CONTAINER_NAME = "aerospike/aerospike-server:4.3.1.4";
+  private static final String DOCKER_CONTAINER_NAME = "aerospike/aerospike-server:5.5.0.7";
 
   @ClassRule
   public static GenericContainer aerospikeContainer = new GenericContainer(DOCKER_CONTAINER_NAME)
diff --git a/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreMapReduceSerialization.java b/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreMapReduceSerialization.java
index 58c1654..9b9abae 100644
--- a/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreMapReduceSerialization.java
+++ b/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreMapReduceSerialization.java
@@ -35,7 +35,7 @@
  */
 public class TestAerospikeStoreMapReduceSerialization {
 
-  private static final String DOCKER_CONTAINER_NAME = "aerospike/aerospike-server:4.3.1.4";
+  private static final String DOCKER_CONTAINER_NAME = "aerospike/aerospike-server:5.5.0.7";
 
   @ClassRule
   public static GenericContainer aerospikeContainer = new GenericContainer(DOCKER_CONTAINER_NAME)
diff --git a/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreWordCount.java b/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreWordCount.java
index 5e31f56..2e42190 100644
--- a/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreWordCount.java
+++ b/gora-aerospike/src/test/java/org/apache/gora/aerospike/mapreduce/TestAerospikeStoreWordCount.java
@@ -36,7 +36,7 @@
  */
 public class TestAerospikeStoreWordCount {
 
-  private static final String DOCKER_CONTAINER_NAME = "aerospike/aerospike-server:4.3.1.4";
+  private static final String DOCKER_CONTAINER_NAME = "aerospike/aerospike-server:5.5.0.7";
 
   @ClassRule
   public static GenericContainer aerospikeContainer = new GenericContainer(DOCKER_CONTAINER_NAME)
diff --git a/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java b/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java
index 97f33d1..9715fe7 100644
--- a/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java
+++ b/gora-aerospike/src/test/java/org/apache/gora/aerospike/store/TestAerospikeStore.java
@@ -39,7 +39,7 @@
  */
 public class TestAerospikeStore extends DataStoreTestBase {
 
-  private static final String DOCKER_CONTAINER_NAME = "aerospike/aerospike-server:4.3.1.4";
+  private static final String DOCKER_CONTAINER_NAME = "aerospike/aerospike-server:5.5.0.7";
 
   @ClassRule
   public static GenericContainer aerospikeContainer = new GenericContainer(DOCKER_CONTAINER_NAME)
diff --git a/gora-elasticsearch/pom.xml b/gora-elasticsearch/pom.xml
new file mode 100644
index 0000000..28ba932
--- /dev/null
+++ b/gora-elasticsearch/pom.xml
@@ -0,0 +1,177 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+	Unless required by applicable law or agreed to in writing,
+	software distributed under the License is distributed on an
+	"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+	KIND, either express or implied.  See the License for the
+	specific language governing permissions and limitations
+	under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.gora</groupId>
+        <artifactId>gora</artifactId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+    <artifactId>gora-elasticsearch</artifactId>
+    <packaging>bundle</packaging>
+
+    <name>Apache Gora :: Elasticsearch</name>
+    <url>http://gora.apache.org</url>
+    <description>The Apache Gora open source framework provides an in-memory data model and
+        persistence for big data. Gora supports persisting to column stores, key value stores,
+        document stores and RDBMSs, and analyzing the data with extensive Apache Hadoop MapReduce
+        support.</description>
+    <inceptionYear>2010</inceptionYear>
+    <organization>
+        <name>The Apache Software Foundation</name>
+        <url>http://www.apache.org/</url>
+    </organization>
+    <issueManagement>
+        <system>JIRA</system>
+        <url>https://issues.apache.org/jira/browse/GORA</url>
+    </issueManagement>
+    <ciManagement>
+        <system>Jenkins</system>
+        <url>https://builds.apache.org/job/Gora-trunk/</url>
+    </ciManagement>
+
+    <properties>
+        <osgi.import>*</osgi.import>
+        <osgi.export>org.apache.gora.elasticsearch*;version="${project.version}";-noimport:=true</osgi.export>
+    </properties>
+
+    <build>
+        <directory>target</directory>
+        <outputDirectory>target/classes</outputDirectory>
+        <finalName>${project.artifactId}-${project.version}</finalName>
+        <testOutputDirectory>target/test-classes</testOutputDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testResources>
+            <testResource>
+                <directory>${project.basedir}/src/test/resources</directory>
+                <includes>
+                    <include>**/*</include>
+                </includes>
+                <!--targetPath>${project.basedir}/target/classes/</targetPath -->
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>${build-helper-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/examples/java</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+    <dependencies>
+        <!-- Gora Internal Dependencies -->
+        <dependency>
+            <groupId>org.apache.gora</groupId>
+            <artifactId>gora-core</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.gora</groupId>
+            <artifactId>gora-core</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Elasticsearch dependencies -->
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>elasticsearch-rest-high-level-client</artifactId>
+            <version>${elasticsearch.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.jdom</groupId>
+            <artifactId>jdom</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
+        <!-- Logging Dependencies -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.jms</groupId>
+                    <artifactId>jms</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-to-slf4j</artifactId>
+            <version>2.8.2</version>
+        </dependency>
+
+        <!-- Testing Dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-minicluster</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
new file mode 100644
index 0000000..9c6704f
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMapping.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Mapping definitions for Elasticsearch.
+ */
+public class ElasticsearchMapping {
+
+  private String indexName;
+  private Map<String, Field> fields;
+
+  /**
+   * Empty constructor for the ElasticsearchMapping class.
+   */
+  public ElasticsearchMapping() {
+    fields = new HashMap<>();
+  }
+
+  /**
+   * Returns the name of Elasticsearch index linked to the mapping.
+   *
+   * @return Index's name
+   */
+  public String getIndexName() {
+    return indexName;
+  }
+
+  /**
+   * Sets the index name of the Elasticsearch mapping.
+   *
+   * @param indexName Index's name
+   */
+  public void setIndexName(String indexName) {
+    this.indexName = indexName;
+  }
+
+  /**
+   * Returns a map with all mapped fields.
+   *
+   * @return Map containing mapped fields
+   */
+  public Map<String, Field> getFields() {
+    return fields;
+  }
+
+  /**
+   * Add a new field to the mapped fields.
+   *
+   * @param classFieldName Field name in the persisted class
+   * @param field          Mapped field from Elasticsearch index
+   */
+  public void addField(String classFieldName, Field field) {
+    fields.put(classFieldName, field);
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
new file mode 100644
index 0000000..31ad474
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/ElasticsearchMappingBuilder.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import com.google.inject.ConfigurationException;
+import org.apache.commons.io.IOUtils;
+import org.apache.gora.elasticsearch.store.ElasticsearchStore;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.util.GoraException;
+import org.jdom.Document;
+import org.jdom.Element;
+import org.jdom.JDOMException;
+import org.jdom.input.SAXBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.xml.sax.SAXException;
+
+import javax.xml.XMLConstants;
+import javax.xml.transform.Source;
+import javax.xml.transform.stream.StreamSource;
+import javax.xml.validation.Schema;
+import javax.xml.validation.SchemaFactory;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Locale;
+
+/**
+ * Builder for Mapping definitions of Elasticsearch.
+ */
+public class ElasticsearchMappingBuilder<K, T extends PersistentBase> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchMappingBuilder.class);
+
+  /**
+   * XSD validation file for the XML mapping.
+   */
+  private static final String XSD_MAPPING_FILE = "gora-elasticsearch.xsd";
+
+  // Index description
+  static final String ATT_NAME = "name";
+
+  static final String ATT_TYPE = "type";
+
+  // Class description
+  static final String TAG_CLASS = "class";
+
+  static final String ATT_KEYCLASS = "keyClass";
+
+  static final String ATT_INDEX = "index";
+
+  static final String TAG_FIELD = "field";
+
+  static final String ATT_DOCFIELD = "docfield";
+
+  static final String ATT_SCALINGFACTOR = "scalingFactor";
+
+  /**
+   * Mapping instance being built.
+   */
+  private ElasticsearchMapping elasticsearchMapping;
+
+  private final ElasticsearchStore<K, T> dataStore;
+
+  /**
+   * Constructor for ElasticsearchMappingBuilder.
+   *
+   * @param store ElasticsearchStore instance
+   */
+  public ElasticsearchMappingBuilder(final ElasticsearchStore<K, T> store) {
+    this.elasticsearchMapping = new ElasticsearchMapping();
+    this.dataStore = store;
+  }
+
+  /**
+   * Returns the Elasticsearch Mapping being built.
+   *
+   * @return Elasticsearch Mapping instance
+   */
+  public ElasticsearchMapping getElasticsearchMapping() {
+    return elasticsearchMapping;
+  }
+
+  /**
+   * Sets the Elasticsearch Mapping.
+   *
+   * @param elasticsearchMapping Elasticsearch Mapping instance
+   */
+  public void setElasticsearchMapping(ElasticsearchMapping elasticsearchMapping) {
+    this.elasticsearchMapping = elasticsearchMapping;
+  }
+
+  /**
+   * Reads Elasticsearch mappings from file.
+   *
+   * @param inputStream   Mapping input stream
+   * @param xsdValidation Parameter for enabling XSD validation
+   */
+  public void readMappingFile(InputStream inputStream, boolean xsdValidation) {
+    try {
+      SAXBuilder saxBuilder = new SAXBuilder();
+      if (inputStream == null) {
+        LOG.error("The mapping input stream is null!");
+        throw new GoraException("The mapping input stream is null!");
+      }
+
+      // Convert input stream to a string to use it a few times
+      String mappingStream = IOUtils.toString(inputStream, Charset.defaultCharset());
+
+      // XSD validation for XML file
+      if (xsdValidation) {
+        Source xmlSource = new StreamSource(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+        Schema schema = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI)
+                .newSchema(new StreamSource(getClass().getClassLoader().getResourceAsStream(XSD_MAPPING_FILE)));
+        schema.newValidator().validate(xmlSource);
+        LOG.info("Mapping file is valid.");
+      }
+
+      Document document = saxBuilder.build(IOUtils.toInputStream(mappingStream, Charset.defaultCharset()));
+      if (document == null) {
+        LOG.error("The mapping document is null!");
+        throw new GoraException("The mapping document is null!");
+      }
+
+      Element root = document.getRootElement();
+      // Extract class descriptions
+      @SuppressWarnings("unchecked")
+      List<Element> classElements = root.getChildren(TAG_CLASS);
+      for (Element classElement : classElements) {
+        final Class<T> persistentClass = dataStore.getPersistentClass();
+        final Class<K> keyClass = dataStore.getKeyClass();
+        if (haveKeyClass(keyClass, classElement)
+                && havePersistentClass(persistentClass, classElement)) {
+          loadPersistentClass(classElement, persistentClass);
+          break;
+        }
+      }
+    } catch (IOException | JDOMException | ConfigurationException | SAXException ex) {
+      throw new RuntimeException(ex);
+    }
+    LOG.info("Gora Elasticsearch mapping file was read successfully.");
+  }
+
+  private boolean haveKeyClass(final Class<K> keyClass,
+                               final Element classElement) {
+    return classElement.getAttributeValue(ATT_KEYCLASS).equals(
+            keyClass.getName());
+  }
+
+  private boolean havePersistentClass(final Class<T> persistentClass,
+                                      final Element classElement) {
+    return classElement.getAttributeValue(ATT_NAME).equals(
+            persistentClass.getName());
+  }
+
+  /**
+   * Handle the XML parsing of the class definition.
+   *
+   * @param classElement the XML node containing the class definition
+   */
+  protected void loadPersistentClass(Element classElement,
+                                     Class<T> pPersistentClass) {
+
+    String indexNameFromMapping = classElement.getAttributeValue(ATT_INDEX);
+    String indexName = dataStore.getSchemaName(indexNameFromMapping, pPersistentClass);
+
+    elasticsearchMapping.setIndexName(indexName);
+    // docNameFromMapping could be null here
+    if (!indexName.equals(indexNameFromMapping)) {
+      ElasticsearchStore.LOG
+              .info("Keyclass and nameclass match, but mismatching index names. "
+                              + "Mappingfile schema is '{}' vs actual schema '{}', assuming they are the same.",
+                      indexNameFromMapping, indexName);
+      if (indexNameFromMapping != null) {
+        elasticsearchMapping.setIndexName(indexName);
+      }
+    }
+
+    // Process fields declaration
+    @SuppressWarnings("unchecked")
+    List<Element> fields = classElement.getChildren(TAG_FIELD);
+    for (Element fieldElement : fields) {
+      String fieldTypeName = fieldElement.getAttributeValue(ATT_TYPE).toUpperCase(Locale.getDefault());
+      Field.FieldType fieldType = new Field.FieldType(Field.DataType.valueOf(fieldTypeName));
+      Field field;
+      if (fieldType.getType() == Field.DataType.SCALED_FLOAT) {
+        int scalingFactor = Integer.parseInt(fieldElement.getAttributeValue(ATT_SCALINGFACTOR));
+        field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), new Field.FieldType(scalingFactor));
+      } else {
+        field = new Field(fieldElement.getAttributeValue(ATT_DOCFIELD), fieldType);
+      }
+      elasticsearchMapping.addField(fieldElement.getAttributeValue(ATT_NAME), field);
+    }
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
new file mode 100644
index 0000000..e5021fb
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/Field.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapping;
+
+import java.util.Objects;
+import java.util.StringJoiner;
+
+/**
+ * Field definition for the Elasticsearch index.
+ */
+public class Field {
+
+  private String name;
+  private FieldType dataType;
+
+  /**
+   * Constructor for Field.
+   *
+   * @param name     Field's name
+   * @param dataType Field's data type
+   */
+  public Field(String name, FieldType dataType) {
+    this.name = name;
+    this.dataType = dataType;
+  }
+
+  /**
+   * Returns the field's name.
+   *
+   * @return Field's name
+   */
+  public String getName() {
+    return name;
+  }
+
+  /**
+   * Sets the field's name.
+   *
+   * @param name Field's name
+   */
+  public void setName(String name) {
+    this.name = name;
+  }
+
+  /**
+   * Returns the field's data-type.
+   *
+   * @return Field's data-type
+   */
+  public FieldType getDataType() {
+    return dataType;
+  }
+
+  /**
+   * Sets the field's data-type.
+   *
+   * @param dataType Field's data-type
+   */
+  public void setDataType(FieldType dataType) {
+    this.dataType = dataType;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+    Field field = (Field) o;
+    return Objects.equals(name, field.name) && Objects.equals(dataType, field.dataType);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, dataType);
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", Field.class.getSimpleName() + "[", "]")
+            .add("name='" + name + "'")
+            .add("dataType=" + dataType)
+            .toString();
+  }
+
+  /**
+   * Elasticsearch supported data-type enumeration. For a more detailed list of data
+   * types supported by Elasticsearch refer to
+   * https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-types.html
+   */
+  public enum DataType {
+    BINARY,
+    BOOLEAN,
+    KEYWORD,
+    CONSTANT_KEYWORD,
+    WILDCARD,
+    LONG,
+    INTEGER,
+    SHORT,
+    BYTE,
+    DOUBLE,
+    FLOAT,
+    HALF_FLOAT,
+    SCALED_FLOAT,
+    OBJECT,
+    FLATTENED,
+    NESTED,
+    TEXT,
+    COMPLETION,
+    SEARCH_AS_YOU_TYPE,
+    TOKEN_COUNT
+  }
+
+  public static class FieldType {
+
+    private DataType type;
+
+    // Parameter for scaled_float type.
+    private int scalingFactor;
+
+    /**
+     * Constructor for FieldType.
+     *
+     * @param type Elasticsearch data type
+     */
+    public FieldType(DataType type) {
+      this.type = type;
+    }
+
+    /**
+     * Constructor for FieldType Implicitly uses scaled_float Elasticsearch data type
+     * with scaling factor parameter.
+     *
+     * @param scalingFactor scaled_float field's scaling factor
+     */
+    public FieldType(int scalingFactor) {
+      this.type = DataType.SCALED_FLOAT;
+      this.scalingFactor = scalingFactor;
+    }
+
+    /**
+     * @return Elasticsearch data type
+     */
+    public DataType getType() {
+      return type;
+    }
+
+    /**
+     * @param type Elasticsearch data type
+     */
+    public void setType(DataType type) {
+      this.type = type;
+    }
+
+    /**
+     * Returns the scaling factor of scaled_float type.
+     *
+     * @return scaled_float field's scaling factor
+     */
+    public int getScalingFactor() {
+      return scalingFactor;
+    }
+
+    /**
+     * Sets the scaling factor of scaled_float type.
+     *
+     * @param scalingFactor scaled_float field's scaling factor
+     */
+    public void setScalingFactor(int scalingFactor) {
+      this.scalingFactor = scalingFactor;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      FieldType fieldType = (FieldType) o;
+      return scalingFactor == fieldType.scalingFactor && type == fieldType.type;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(type, scalingFactor);
+    }
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java
new file mode 100644
index 0000000..c5dccd1
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/mapping/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Mapping related classes.
+ */
+package org.apache.gora.elasticsearch.mapping;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java
new file mode 100644
index 0000000..8d2313a
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains all Elasticsearch datastore related classes.
+ */
+package org.apache.gora.elasticsearch;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
new file mode 100644
index 0000000..214380e
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchQuery.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.store.DataStore;
+
+/**
+ * Elasticsearch specific implementation of the {@link Query} interface.
+ */
+public class ElasticsearchQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
+
+  /**
+   * Constructor for the query.
+   *
+   * @param dataStore data store used
+   */
+  public ElasticsearchQuery(DataStore<K, T> dataStore) {
+    super(dataStore);
+  }
+
+  public ElasticsearchQuery() {
+    super(null);
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
new file mode 100644
index 0000000..01d5d8e
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/ElasticsearchResult.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.query;
+
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.impl.ResultBase;
+import org.apache.gora.store.DataStore;
+
+import java.util.List;
+
+/**
+ * ElasticsearchResult specific implementation of the
+ * {@link org.apache.gora.query.Result} interface.
+ */
+public class ElasticsearchResult<K, T extends PersistentBase> extends ResultBase<K, T> {
+
+  /**
+   * List of resulting persistent objects.
+   */
+  private List<T> persistentObjects;
+
+  /**
+   * List of resulting objects keys.
+   */
+  private List<K> persistentKeys;
+
+  public ElasticsearchResult(DataStore<K, T> dataStore, Query<K, T> query, List<K> persistentKeys, List<T> persistentObjects) {
+    super(dataStore, query);
+    this.persistentKeys = persistentKeys;
+    this.persistentObjects = persistentObjects;
+  }
+
+  @Override
+  public float getProgress() {
+    if (persistentObjects.size() == 0) {
+      return 1;
+    }
+
+    return offset / (float) persistentObjects.size();
+  }
+
+  @Override
+  public int size() {
+    return persistentObjects.size();
+  }
+
+  @Override
+  protected boolean nextInner() {
+    if ((int) offset == persistentObjects.size()) {
+      return false;
+    }
+
+    persistent = persistentObjects.get((int) offset);
+    key = persistentKeys.get((int) offset);
+    return persistent != null;
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java
new file mode 100644
index 0000000..ad7b9d7
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/query/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Query related classes.
+ */
+package org.apache.gora.elasticsearch.query;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
new file mode 100644
index 0000000..610f0fd
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStore.java
@@ -0,0 +1,872 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.avro.Schema;
+import org.apache.avro.util.Utf8;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMappingBuilder;
+import org.apache.gora.elasticsearch.mapping.Field;
+import org.apache.gora.elasticsearch.query.ElasticsearchQuery;
+import org.apache.gora.elasticsearch.query.ElasticsearchResult;
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.filter.Filter;
+import org.apache.gora.persistency.Persistent;
+import org.apache.gora.persistency.impl.BeanFactoryImpl;
+import org.apache.gora.persistency.impl.DirtyListWrapper;
+import org.apache.gora.persistency.impl.DirtyMapWrapper;
+import org.apache.gora.persistency.impl.PersistentBase;
+import org.apache.gora.query.PartitionQuery;
+import org.apache.gora.query.Query;
+import org.apache.gora.query.Result;
+import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.store.impl.DataStoreBase;
+import org.apache.gora.util.AvroUtils;
+import org.apache.gora.util.ClassLoadingUtils;
+import org.apache.gora.util.GoraException;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.reactor.IOReactorConfig;
+import org.apache.http.message.BasicHeader;
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.flush.FlushRequest;
+import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
+import org.elasticsearch.action.delete.DeleteRequest;
+import org.elasticsearch.action.delete.DeleteResponse;
+import org.elasticsearch.action.get.GetRequest;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.reindex.BulkByScrollResponse;
+import org.elasticsearch.index.reindex.DeleteByQueryRequest;
+import org.elasticsearch.index.reindex.UpdateByQueryRequest;
+import org.elasticsearch.script.Script;
+import org.elasticsearch.script.ScriptType;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Implementation of a Apache Elasticsearch data store to be used by Apache Gora.
+ *
+ * @param <K> class to be used for the key
+ * @param <T> class to be persisted within the store
+ */
+public class ElasticsearchStore<K, T extends PersistentBase> extends DataStoreBase<K, T> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStore.class);
+  private static final String DEFAULT_MAPPING_FILE = "gora-elasticsearch-mapping.xml";
+  public static final String PARSE_MAPPING_FILE_KEY = "gora.elasticsearch.mapping.file";
+  private static final String XML_MAPPING_DEFINITION = "gora.mapping";
+  public static final String XSD_VALIDATION = "gora.xsd_validation";
+
+  /**
+   * Elasticsearch client
+   */
+  private RestHighLevelClient client;
+
+  /**
+   * Mapping definition for Elasticsearch
+   */
+  private ElasticsearchMapping elasticsearchMapping;
+
+  @Override
+  public void initialize(Class<K> keyClass, Class<T> persistentClass, Properties properties) throws GoraException {
+    try {
+      LOG.debug("Initializing Elasticsearch store");
+      ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+      super.initialize(keyClass, persistentClass, properties);
+      ElasticsearchMappingBuilder<K, T> builder = new ElasticsearchMappingBuilder<>(this);
+      InputStream mappingStream;
+      if (properties.containsKey(XML_MAPPING_DEFINITION)) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("{} = {}", XML_MAPPING_DEFINITION, properties.getProperty(XML_MAPPING_DEFINITION));
+        }
+        mappingStream = org.apache.commons.io.IOUtils.toInputStream(properties.getProperty(XML_MAPPING_DEFINITION), (Charset) null);
+      } else {
+        mappingStream = getClass().getClassLoader().getResourceAsStream(properties.getProperty(PARSE_MAPPING_FILE_KEY, DEFAULT_MAPPING_FILE));
+      }
+      String xsdValidation = properties.getProperty(XSD_VALIDATION, "false");
+      builder.readMappingFile(mappingStream, Boolean.parseBoolean(xsdValidation));
+      elasticsearchMapping = builder.getElasticsearchMapping();
+      client = createClient(parameters);
+      LOG.info("Elasticsearch store was successfully initialized.");
+    } catch (Exception ex) {
+      LOG.error("Error while initializing Elasticsearch store", ex);
+      throw new GoraException(ex);
+    }
+  }
+
+  public static RestHighLevelClient createClient(ElasticsearchParameters parameters) {
+    RestClientBuilder clientBuilder = RestClient.builder(new HttpHost(parameters.getHost(), parameters.getPort()));
+
+    // Choosing the authentication method.
+    switch (parameters.getAuthenticationType()) {
+      case BASIC:
+        if (parameters.getUsername() != null && parameters.getPassword() != null) {
+          final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+          credentialsProvider.setCredentials(AuthScope.ANY,
+                  new UsernamePasswordCredentials(parameters.getUsername(), parameters.getPassword()));
+          clientBuilder.setHttpClientConfigCallback(
+                  httpClientBuilder -> httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
+        } else {
+          throw new IllegalArgumentException("Missing username or password for BASIC authentication.");
+        }
+        break;
+      case TOKEN:
+        if (parameters.getAuthorizationToken() != null) {
+          Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization",
+                  parameters.getAuthorizationToken())};
+          clientBuilder.setDefaultHeaders(defaultHeaders);
+        } else {
+          throw new IllegalArgumentException("Missing authorization token for TOKEN authentication.");
+        }
+        break;
+      case APIKEY:
+        if (parameters.getApiKeyId() != null && parameters.getApiKeySecret() != null) {
+          String apiKeyAuth = Base64.getEncoder()
+                  .encodeToString((parameters.getApiKeyId() + ":" + parameters.getApiKeySecret())
+                          .getBytes(StandardCharsets.UTF_8));
+          Header[] defaultHeaders = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKeyAuth)};
+          clientBuilder.setDefaultHeaders(defaultHeaders);
+        } else {
+          throw new IllegalArgumentException("Missing API Key ID or API Key Secret for APIKEY authentication.");
+        }
+        break;
+    }
+
+    if (parameters.getConnectTimeout() != 0) {
+      clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+              requestConfigBuilder.setConnectTimeout(parameters.getConnectTimeout()));
+    }
+
+    if (parameters.getSocketTimeout() != 0) {
+      clientBuilder.setRequestConfigCallback(requestConfigBuilder ->
+              requestConfigBuilder.setSocketTimeout(parameters.getSocketTimeout()));
+    }
+
+    if (parameters.getIoThreadCount() != 0) {
+      clientBuilder.setHttpClientConfigCallback(httpClientBuilder ->
+              httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom()
+                      .setIoThreadCount(parameters.getIoThreadCount()).build()));
+    }
+    return new RestHighLevelClient(clientBuilder);
+  }
+
+  public ElasticsearchMapping getMapping() {
+    return elasticsearchMapping;
+  }
+
+  @Override
+  public String getSchemaName() {
+    return elasticsearchMapping.getIndexName();
+  }
+
+  @Override
+  public String getSchemaName(final String mappingSchemaName, final Class<?> persistentClass) {
+    return super.getSchemaName(mappingSchemaName, persistentClass);
+  }
+
+  @Override
+  public void createSchema() throws GoraException {
+    CreateIndexRequest request = new CreateIndexRequest(elasticsearchMapping.getIndexName());
+    Map<String, Object> properties = new HashMap<>();
+    for (Map.Entry<String, Field> entry : elasticsearchMapping.getFields().entrySet()) {
+      Map<String, Object> fieldType = new HashMap<>();
+      fieldType.put("type", entry.getValue().getDataType().getType().name().toLowerCase(Locale.ROOT));
+      if (entry.getValue().getDataType().getType() == Field.DataType.SCALED_FLOAT) {
+        fieldType.put("scaling_factor", entry.getValue().getDataType().getScalingFactor());
+      }
+      properties.put(entry.getKey(), fieldType);
+    }
+    // Special field for range query
+    properties.put("gora_id", new HashMap<String, Object>() {{
+      put("type", "keyword");
+    }});
+    Map<String, Object> mapping = new HashMap<>();
+    mapping.put("properties", properties);
+    request.mapping(mapping);
+    try {
+      if (!client.indices().exists(
+              new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+        client.indices().create(request, RequestOptions.DEFAULT);
+      }
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public void deleteSchema() throws GoraException {
+    DeleteIndexRequest request = new DeleteIndexRequest(elasticsearchMapping.getIndexName());
+    try {
+      if (client.indices().exists(
+              new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT)) {
+        client.indices().delete(request, RequestOptions.DEFAULT);
+      }
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public boolean schemaExists() throws GoraException {
+    try {
+      return client.indices().exists(
+              new GetIndexRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public boolean exists(K key) throws GoraException {
+    GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+    getRequest.fetchSourceContext(new FetchSourceContext(false)).storedFields("_none_");
+    try {
+      return client.exists(getRequest, RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public T get(K key, String[] fields) throws GoraException {
+    String[] requestedFields = getFieldsToQuery(fields);
+    List<String> documentFields = new ArrayList<>();
+    for (String requestedField : requestedFields) {
+      documentFields.add(elasticsearchMapping.getFields().get(requestedField).getName());
+    }
+    try {
+      // Prepare the Elasticsearch request
+      GetRequest getRequest = new GetRequest(elasticsearchMapping.getIndexName(), (String) key);
+      GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
+      if (getResponse.isExists()) {
+        Map<String, Object> sourceMap = getResponse.getSourceAsMap();
+
+        // Map of field's name and its value from the Document
+        Map<String, Object> fieldsAndValues = new HashMap<>();
+        for (String field : documentFields) {
+          fieldsAndValues.put(field, sourceMap.get(field));
+        }
+
+        // Build the corresponding persistent
+        return newInstance(fieldsAndValues, requestedFields);
+      } else {
+        return null;
+      }
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public void put(K key, T obj) throws GoraException {
+    if (obj.isDirty()) {
+      Schema schemaObj = obj.getSchema();
+      List<Schema.Field> fields = schemaObj.getFields();
+      Map<String, Object> jsonMap = new HashMap<>();
+      for (Schema.Field field : fields) {
+        Field mappedField = elasticsearchMapping.getFields().get(field.name());
+        if (mappedField != null) {
+          Object fieldValue = obj.get(field.pos());
+          if (fieldValue != null) {
+            Schema fieldSchema = field.schema();
+            Object serializedObj = serializeFieldValue(fieldSchema, fieldValue);
+            jsonMap.put(mappedField.getName(), serializedObj);
+          }
+        }
+      }
+      // Special field for range query
+      jsonMap.put("gora_id", key);
+      // Prepare the Elasticsearch request
+      IndexRequest request = new IndexRequest(elasticsearchMapping.getIndexName()).id((String) key).source(jsonMap);
+      try {
+        client.index(request, RequestOptions.DEFAULT);
+      } catch (IOException ex) {
+        throw new GoraException(ex);
+      }
+    } else {
+      LOG.info("Ignored putting object {} in the store as it is neither "
+              + "new, neither dirty.", new Object[]{obj});
+    }
+  }
+
+  @Override
+  public boolean delete(K key) throws GoraException {
+    DeleteRequest request = new DeleteRequest(elasticsearchMapping.getIndexName(), (String) key);
+    try {
+      DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
+      return deleteResponse.getResult() != DocWriteResponse.Result.NOT_FOUND;
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public long deleteByQuery(Query<K, T> query) throws GoraException {
+    try {
+      BulkByScrollResponse bulkResponse;
+      if (query.getFields() != null && query.getFields().length < elasticsearchMapping.getFields().size()) {
+        UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(elasticsearchMapping.getIndexName());
+        QueryBuilder matchDocumentsWithinRange = QueryBuilders
+                .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+        updateRequest.setQuery(matchDocumentsWithinRange);
+
+        // Create a script for deleting fields
+        StringBuilder toDelete = new StringBuilder();
+        String[] fieldsToDelete = query.getFields();
+        for (String field : fieldsToDelete) {
+          String elasticsearchField = elasticsearchMapping.getFields().get(field).getName();
+          toDelete.append(String.format(Locale.getDefault(), "ctx._source.remove('%s');", elasticsearchField));
+        }
+        //toDelete.deleteCharAt(toDelete.length() - 1);
+        updateRequest.setScript(new Script(ScriptType.INLINE, "painless", toDelete.toString(), Collections.emptyMap()));
+        bulkResponse = client.updateByQuery(updateRequest, RequestOptions.DEFAULT);
+        return bulkResponse.getUpdated();
+      } else {
+        DeleteByQueryRequest deleteRequest = new DeleteByQueryRequest(elasticsearchMapping.getIndexName());
+        QueryBuilder matchDocumentsWithinRange = QueryBuilders
+                .rangeQuery("gora_id").from(query.getStartKey()).to(query.getEndKey());
+        deleteRequest.setQuery(matchDocumentsWithinRange);
+        bulkResponse = client.deleteByQuery(deleteRequest, RequestOptions.DEFAULT);
+        return bulkResponse.getDeleted();
+      }
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public Result<K, T> execute(Query<K, T> query) throws GoraException {
+    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+
+    // Set the query result limit
+    int size = (int) query.getLimit();
+    if (size != -1) {
+      searchSourceBuilder.size(size);
+    }
+    try {
+      // Build the actual Elasticsearch range query
+      QueryBuilder rangeQueryBuilder = QueryBuilders
+              .rangeQuery("gora_id").gte(query.getStartKey()).lte(query.getEndKey());
+      searchSourceBuilder.query(rangeQueryBuilder);
+      SearchRequest searchRequest = new SearchRequest(elasticsearchMapping.getIndexName());
+      searchRequest.source(searchSourceBuilder);
+      SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);
+
+      String[] avroFields = getFieldsToQuery(query.getFields());
+
+      SearchHits hits = searchResponse.getHits();
+      SearchHit[] searchHits = hits.getHits();
+      List<K> hitId = new ArrayList<>();
+
+      // Check filter
+      Filter<K, T> queryFilter = query.getFilter();
+      List<T> filteredObjects = new ArrayList<>();
+      for (SearchHit hit : searchHits) {
+        Map<String, Object> sourceAsMap = hit.getSourceAsMap();
+        if (queryFilter == null || !queryFilter.filter((K) hit.getId(), newInstance(sourceAsMap, avroFields))) {
+          filteredObjects.add(newInstance(sourceAsMap, avroFields));
+          hitId.add((K) hit.getId());
+        }
+      }
+      return new ElasticsearchResult<>(this, query, hitId, filteredObjects);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public Query<K, T> newQuery() {
+    ElasticsearchQuery<K, T> query = new ElasticsearchQuery<>(this);
+    query.setFields(getFieldsToQuery(null));
+    return query;
+  }
+
+  @Override
+  public List<PartitionQuery<K, T>> getPartitions(Query<K, T> query) throws IOException {
+    List<PartitionQuery<K, T>> partitions = new ArrayList<>();
+    PartitionQueryImpl<K, T> partitionQuery = new PartitionQueryImpl<>(
+            query);
+    partitionQuery.setConf(getConf());
+    partitions.add(partitionQuery);
+    return partitions;
+  }
+
+  @Override
+  public void flush() throws GoraException {
+    try {
+      client.indices().refresh(new RefreshRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+      client.indices().flush(new FlushRequest(elasticsearchMapping.getIndexName()), RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    try {
+      client.close();
+      LOG.info("Elasticsearch datastore destroyed successfully.");
+    } catch (IOException ex) {
+      LOG.error(ex.getMessage(), ex);
+    }
+  }
+
+  /**
+   * Build a new instance of the persisted class from the Document retrieved from the database.
+   *
+   * @param fieldsAndValues Map of field's name and its value from the Document
+   *                        that results from the query to the database
+   * @param requestedFields the list of fields to be mapped to the persistence class instance
+   * @return a persistence class instance which content was deserialized from the Document
+   * @throws IOException
+   */
+  public T newInstance(Map<String, Object> fieldsAndValues, String[] requestedFields) throws IOException {
+    // Create new empty persistent bean instance
+    T persistent = newPersistent();
+
+    requestedFields = getFieldsToQuery(requestedFields);
+    // Populate each field
+    for (String objField : requestedFields) {
+      Schema.Field field = fieldMap.get(objField);
+      Schema fieldSchema = field.schema();
+      String docFieldName = elasticsearchMapping.getFields().get(objField).getName();
+      Object fieldValue = fieldsAndValues.get(docFieldName);
+
+      Object result = deserializeFieldValue(field, fieldSchema, fieldValue);
+      persistent.put(field.pos(), result);
+    }
+    persistent.clearDirty();
+    return persistent;
+  }
+
+  /**
+   * Deserialize an Elasticsearch object to a persistent Avro object.
+   *
+   * @param avroField          persistent Avro class field to which the value will be deserialized
+   * @param avroFieldSchema    schema for the persistent Avro class field
+   * @param elasticsearchValue Elasticsearch field value to be deserialized
+   * @return deserialized Avro object from the Elasticsearch object
+   * @throws GoraException when the given Elasticsearch value cannot be deserialized
+   */
+  private Object deserializeFieldValue(Schema.Field avroField, Schema avroFieldSchema,
+                                       Object elasticsearchValue) throws GoraException {
+    Object fieldValue;
+    switch (avroFieldSchema.getType()) {
+      case MAP:
+        fieldValue = fromElasticsearchMap(avroField, avroFieldSchema.getValueType(), (Map<String, Object>) elasticsearchValue);
+        break;
+      case RECORD:
+        fieldValue = fromElasticsearchRecord(avroFieldSchema, (Map<String, Object>) elasticsearchValue);
+        break;
+      case ARRAY:
+        fieldValue = fromElasticsearchList(avroField, avroFieldSchema.getElementType(), elasticsearchValue);
+        break;
+      case BOOLEAN:
+        fieldValue = Boolean.parseBoolean(elasticsearchValue.toString());
+        break;
+      case BYTES:
+        fieldValue = ByteBuffer.wrap(Base64.getDecoder().decode(elasticsearchValue.toString()));
+        break;
+      case FIXED:
+      case NULL:
+        fieldValue = null;
+        break;
+      case UNION:
+        fieldValue = fromElasticsearchUnion(avroField, avroFieldSchema, elasticsearchValue);
+        break;
+      case DOUBLE:
+        fieldValue = Double.parseDouble(elasticsearchValue.toString());
+        break;
+      case ENUM:
+        fieldValue = AvroUtils.getEnumValue(avroFieldSchema, elasticsearchValue.toString());
+        break;
+      case FLOAT:
+        fieldValue = Float.parseFloat(elasticsearchValue.toString());
+        break;
+      case INT:
+        fieldValue = Integer.parseInt(elasticsearchValue.toString());
+        break;
+      case LONG:
+        fieldValue = Long.parseLong(elasticsearchValue.toString());
+        break;
+      case STRING:
+        fieldValue = new Utf8(elasticsearchValue.toString());
+        break;
+      default:
+        fieldValue = elasticsearchValue;
+    }
+    return fieldValue;
+  }
+
+  /**
+   * Deserialize an Elasticsearch List to an Avro List as used in Gora generated classes
+   * that can safely be written into Avro persistent object.
+   *
+   * @param avroField          persistent Avro class field to which the value will be deserialized
+   * @param avroFieldSchema    schema for the persistent Avro class field
+   * @param elasticsearchValue Elasticsearch field value to be deserialized
+   * @return deserialized Avro List from the given Elasticsearch value
+   * @throws GoraException when one of the underlying values cannot be deserialized
+   */
+  private Object fromElasticsearchList(Schema.Field avroField, Schema avroFieldSchema,
+                                       Object elasticsearchValue) throws GoraException {
+    List<Object> list = new ArrayList<>();
+    if (elasticsearchValue != null) {
+      for (Object item : (List<Object>) elasticsearchValue) {
+        Object result = deserializeFieldValue(avroField, avroFieldSchema, item);
+        list.add(result);
+      }
+    }
+    return new DirtyListWrapper<>(list);
+  }
+
+  /**
+   * Deserialize an Elasticsearch Map to an Avro Map as used in Gora generated classes
+   * that can safely be written into Avro persistent object.
+   *
+   * @param avroField        persistent Avro class field to which the value will be deserialized
+   * @param avroFieldSchema  schema for the persistent Avro class field
+   * @param elasticsearchMap Elasticsearch Map value to be deserialized
+   * @return deserialized Avro Map from the given Elasticsearch Map value
+   * @throws GoraException when one of the underlying values cannot be deserialized
+   */
+  private Object fromElasticsearchMap(Schema.Field avroField, Schema avroFieldSchema,
+                                      Map<String, Object> elasticsearchMap) throws GoraException {
+    Map<Utf8, Object> deserializedMap = new HashMap<>();
+    if (elasticsearchMap != null) {
+      for (Map.Entry<String, Object> entry : elasticsearchMap.entrySet()) {
+        String mapKey = entry.getKey();
+        Object mapValue = deserializeFieldValue(avroField, avroFieldSchema, entry.getValue());
+        deserializedMap.put(new Utf8(mapKey), mapValue);
+      }
+    }
+    return new DirtyMapWrapper<>(deserializedMap);
+  }
+
+  /**
+   * Deserialize an Elasticsearch Record to an Avro Object as used in Gora generated classes
+   * that can safely be written into Avro persistent object.
+   *
+   * @param avroFieldSchema     schema for the persistent Avro class field
+   * @param elasticsearchRecord Elasticsearch Record value to be deserialized
+   * @return deserialized Avro Object from the given Elasticsearch Record value
+   * @throws GoraException when one of the underlying values cannot be deserialized
+   */
+  private Object fromElasticsearchRecord(Schema avroFieldSchema,
+                                         Map<String, Object> elasticsearchRecord) throws GoraException {
+    Class<?> clazz;
+    try {
+      clazz = ClassLoadingUtils.loadClass(avroFieldSchema.getFullName());
+    } catch (ClassNotFoundException ex) {
+      throw new GoraException(ex);
+    }
+    PersistentBase record = (PersistentBase) new BeanFactoryImpl(keyClass, clazz).newPersistent();
+    for (Schema.Field recField : avroFieldSchema.getFields()) {
+      Schema innerSchema = recField.schema();
+      Field innerDocField = elasticsearchMapping.getFields().getOrDefault(recField.name(), new Field(recField.name(), null));
+      record.put(recField.pos(), deserializeFieldValue(recField, innerSchema, elasticsearchRecord.get(innerDocField.getName())));
+    }
+    return record;
+  }
+
+  /**
+   * Deserialize an Elasticsearch Union to an Avro Object as used in Gora generated classes
+   * that can safely be written into Avro persistent object.
+   *
+   * @param avroField          persistent Avro class field to which the value will be deserialized
+   * @param avroFieldSchema    schema for the persistent Avro class field
+   * @param elasticsearchUnion Elasticsearch Union value to be deserialized
+   * @return deserialized Avro Object from the given Elasticsearch Union value
+   * @throws GoraException when one of the underlying values cannot be deserialized
+   */
+  private Object fromElasticsearchUnion(Schema.Field avroField, Schema avroFieldSchema, Object elasticsearchUnion) throws GoraException {
+    Object deserializedUnion;
+    Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+    Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+    if (avroFieldSchema.getTypes().size() == 2 &&
+            (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+            !type0.equals(type1)) {
+      int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+      Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+      deserializedUnion = deserializeFieldValue(avroField, unionSchema, elasticsearchUnion);
+    } else if (avroFieldSchema.getTypes().size() == 3) {
+      Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+      if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+              (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+        if (elasticsearchUnion == null) {
+          deserializedUnion = null;
+        } else if (elasticsearchUnion instanceof String) {
+          throw new GoraException("Elasticsearch supports Union data type only represented as Record or Null.");
+        } else {
+          int schemaPos = getUnionSchema(elasticsearchUnion, avroFieldSchema);
+          Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+          deserializedUnion = fromElasticsearchRecord(unionSchema, (Map<String, Object>) elasticsearchUnion);
+        }
+      } else {
+        throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+      }
+    } else {
+      throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+    }
+    return deserializedUnion;
+  }
+
+  /**
+   * Serialize a persistent Avro object as used in Gora generated classes to
+   * an object that can be written into Elasticsearch.
+   *
+   * @param avroFieldSchema schema for the persistent Avro class field
+   * @param avroFieldValue  persistent Avro field value to be serialized
+   * @return serialized field value
+   * @throws GoraException when the given Avro object cannot be serialized
+   */
+  private Object serializeFieldValue(Schema avroFieldSchema, Object avroFieldValue) throws GoraException {
+    Object output = avroFieldValue;
+    switch (avroFieldSchema.getType()) {
+      case ARRAY:
+        output = arrayToElasticsearch((List<?>) avroFieldValue, avroFieldSchema.getElementType());
+        break;
+      case MAP:
+        output = mapToElasticsearch((Map<CharSequence, ?>) avroFieldValue, avroFieldSchema.getValueType());
+        break;
+      case RECORD:
+        output = recordToElasticsearch(avroFieldValue, avroFieldSchema);
+        break;
+      case BYTES:
+        output = Base64.getEncoder().encodeToString(((ByteBuffer) avroFieldValue).array());
+        break;
+      case UNION:
+        output = unionToElasticsearch(avroFieldValue, avroFieldSchema);
+        break;
+      case BOOLEAN:
+      case DOUBLE:
+      case ENUM:
+      case FLOAT:
+      case INT:
+      case LONG:
+      case STRING:
+        output = avroFieldValue.toString();
+        break;
+      case FIXED:
+        break;
+      case NULL:
+        output = null;
+        break;
+    }
+    return output;
+  }
+
+  /**
+   * Serialize a Java collection of persistent Avro objects as used in Gora generated classes to a
+   * List that can safely be written into Elasticsearch.
+   *
+   * @param collection      the collection to be serialized
+   * @param avroFieldSchema field schema for the underlying type
+   * @return a List version of the collection that can be safely written into Elasticsearch
+   * @throws GoraException when one of the underlying values cannot be serialized
+   */
+  private List<Object> arrayToElasticsearch(Collection<?> collection, Schema avroFieldSchema) throws GoraException {
+    List<Object> list = new ArrayList<>();
+    for (Object item : collection) {
+      Object result = serializeFieldValue(avroFieldSchema, item);
+      list.add(result);
+    }
+    return list;
+  }
+
+  /**
+   * Serialize a Java map of persistent Avro objects as used in Gora generated classes to a
+   * map that can safely be written into Elasticsearch.
+   *
+   * @param map             the map to be serialized
+   * @param avroFieldSchema field schema for the underlying type
+   * @return a Map version of the Java map that can be safely written into Elasticsearch
+   * @throws GoraException when one of the underlying values cannot be serialized
+   */
+  private Map<CharSequence, ?> mapToElasticsearch(Map<CharSequence, ?> map, Schema avroFieldSchema) throws GoraException {
+    Map<CharSequence, Object> serializedMap = new HashMap<>();
+    for (Map.Entry<CharSequence, ?> entry : map.entrySet()) {
+      String mapKey = entry.getKey().toString();
+      Object mapValue = entry.getValue();
+      Object result = serializeFieldValue(avroFieldSchema, mapValue);
+      serializedMap.put(mapKey, result);
+    }
+    return serializedMap;
+  }
+
+  /**
+   * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+   * record that can safely be written into Elasticsearch.
+   *
+   * @param record          the object to be serialized
+   * @param avroFieldSchema field schema for the underlying type
+   * @return a record version of the Java object that can be safely written into Elasticsearch
+   * @throws GoraException when one of the underlying values cannot be serialized
+   */
+  private Map<CharSequence, Object> recordToElasticsearch(Object record, Schema avroFieldSchema) throws GoraException {
+    Map<CharSequence, Object> serializedRecord = new HashMap<>();
+    for (Schema.Field member : avroFieldSchema.getFields()) {
+      Object innerValue = ((PersistentBase) record).get(member.pos());
+      serializedRecord.put(member.name(), serializeFieldValue(member.schema(), innerValue));
+    }
+    return serializedRecord;
+  }
+
+  /**
+   * Serialize a Java object of persistent Avro objects as used in Gora generated classes to a
+   * object that can safely be written into Elasticsearch.
+   *
+   * @param union           the object to be serialized
+   * @param avroFieldSchema field schema for the underlying type
+   * @return a object version of the Java object that can be safely written into Elasticsearch
+   * @throws GoraException when one of the underlying values cannot be serialized
+   */
+  private Object unionToElasticsearch(Object union, Schema avroFieldSchema) throws GoraException {
+    Object serializedUnion;
+    Schema.Type type0 = avroFieldSchema.getTypes().get(0).getType();
+    Schema.Type type1 = avroFieldSchema.getTypes().get(1).getType();
+    if (avroFieldSchema.getTypes().size() == 2 &&
+            (type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL)) &&
+            !type0.equals(type1)) {
+      int schemaPos = getUnionSchema(union, avroFieldSchema);
+      Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+      serializedUnion = serializeFieldValue(unionSchema, union);
+    } else if (avroFieldSchema.getTypes().size() == 3) {
+      Schema.Type type2 = avroFieldSchema.getTypes().get(2).getType();
+      if ((type0.equals(Schema.Type.NULL) || type1.equals(Schema.Type.NULL) || type2.equals(Schema.Type.NULL)) &&
+              (type0.equals(Schema.Type.STRING) || type1.equals(Schema.Type.STRING) || type2.equals(Schema.Type.STRING))) {
+        if (union == null) {
+          serializedUnion = null;
+        } else if (union instanceof String) {
+          throw new GoraException("Elasticsearch does not support foreign key IDs in Union data type.");
+        } else {
+          int schemaPos = getUnionSchema(union, avroFieldSchema);
+          Schema unionSchema = avroFieldSchema.getTypes().get(schemaPos);
+          serializedUnion = recordToElasticsearch(union, unionSchema);
+        }
+      } else {
+        throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+      }
+    } else {
+      throw new GoraException("Elasticsearch only supports Union of two types field: Record or Null.");
+    }
+    return serializedUnion;
+  }
+
+  /**
+   * Method to retrieve the corresponding schema type index of a particular
+   * object having UNION schema. As UNION type can have one or more types and at
+   * a given instance, it holds an object of only one type of the defined types,
+   * this method is used to figure out the corresponding instance's schema type
+   * index.
+   *
+   * @param instanceValue value that the object holds
+   * @param unionSchema   union schema containing all of the data types
+   * @return the unionSchemaPosition corresponding schema position
+   */
+  private int getUnionSchema(Object instanceValue, Schema unionSchema) {
+    int unionSchemaPos = 0;
+    for (Schema currentSchema : unionSchema.getTypes()) {
+      Schema.Type schemaType = currentSchema.getType();
+      if (instanceValue instanceof CharSequence && schemaType.equals(Schema.Type.STRING)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof ByteBuffer && schemaType.equals(Schema.Type.BYTES)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.BYTES)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof String && schemaType.equals(Schema.Type.BYTES)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Integer && schemaType.equals(Schema.Type.INT)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Long && schemaType.equals(Schema.Type.LONG)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Double && schemaType.equals(Schema.Type.DOUBLE)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Float && schemaType.equals(Schema.Type.FLOAT)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Boolean && schemaType.equals(Schema.Type.BOOLEAN)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Map && schemaType.equals(Schema.Type.MAP)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof List && schemaType.equals(Schema.Type.ARRAY)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Persistent && schemaType.equals(Schema.Type.RECORD)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof Map && schemaType.equals(Schema.Type.RECORD)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.MAP)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.RECORD)) {
+        return unionSchemaPos;
+      }
+      if (instanceValue instanceof byte[] && schemaType.equals(Schema.Type.ARRAY)) {
+        return unionSchemaPos;
+      }
+      unionSchemaPos++;
+    }
+    return 0;
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
new file mode 100644
index 0000000..c253e87
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreCollectionMetadata.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Class with the collection info returned by ElasticsearchStoreMetadataAnalyzer with the information
+ * about an ElasticsearchStore collection.
+ */
+public class ElasticsearchStoreCollectionMetadata {
+
+  /**
+   * Collection document keys present in a given collection at ElasticsearchStore.
+   */
+  private List<String> documentKeys = new ArrayList<>();
+
+  /**
+   * Collection document types present in a given collection at ElasticsearchStore.
+   */
+  private List<String> documentTypes = new ArrayList<>();
+
+  public List<String> getDocumentKeys() {
+    return documentKeys;
+  }
+
+  public void setDocumentKeys(List<String> documentKeys) {
+    this.documentKeys = documentKeys;
+  }
+
+  public List<String> getDocumentTypes() {
+    return documentTypes;
+  }
+
+  public void setDocumentTypes(List<String> documentTypes) {
+    this.documentTypes = documentTypes;
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
new file mode 100644
index 0000000..465c268
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/ElasticsearchStoreMetadataAnalyzer.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.store.impl.DataStoreMetadataAnalyzer;
+import org.apache.gora.util.GoraException;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.client.indices.GetIndexRequest;
+import org.elasticsearch.client.indices.GetIndexResponse;
+import org.elasticsearch.cluster.metadata.MappingMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class ElasticsearchStoreMetadataAnalyzer extends DataStoreMetadataAnalyzer {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchStoreMetadataAnalyzer.class);
+
+  private RestHighLevelClient elasticsearchClient;
+
+  @Override
+  public void initialize() throws GoraException {
+    ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, getConf());
+    elasticsearchClient = ElasticsearchStore.createClient(parameters);
+  }
+
+  @Override
+  public String getType() {
+    return "ELASTICSEARCH";
+  }
+
+  @Override
+  public List<String> getTablesNames() throws GoraException {
+    GetIndexRequest request = new GetIndexRequest("*");
+    GetIndexResponse response;
+    try {
+      response = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+    if (response == null) {
+      LOG.error("Could not find indices.");
+      throw new GoraException("Could not find indices.");
+    }
+    return Arrays.asList(response.getIndices());
+  }
+
+  @Override
+  public ElasticsearchStoreCollectionMetadata getTableInfo(String tableName) throws GoraException {
+    GetIndexRequest request = new GetIndexRequest(tableName);
+    GetIndexResponse getIndexResponse;
+    try {
+      getIndexResponse = elasticsearchClient.indices().get(request, RequestOptions.DEFAULT);
+    } catch (IOException ex) {
+      throw new GoraException(ex);
+    }
+    MappingMetadata indexMappings = getIndexResponse.getMappings().get(tableName);
+    Map<String, Object> indexKeysAndTypes = (Map<String, Object>) indexMappings.getSourceAsMap().get("properties");
+
+    List<String> documentTypes = new ArrayList<>();
+    List<String> documentKeys = new ArrayList<>();
+
+    for (Map.Entry<String, Object> entry : indexKeysAndTypes.entrySet()) {
+      Map<String, Object> subEntry = (Map<String, Object>) entry.getValue();
+      documentTypes.add((String) subEntry.get("type"));
+      documentKeys.add(entry.getKey());
+    }
+
+    ElasticsearchStoreCollectionMetadata collectionMetadata = new ElasticsearchStoreCollectionMetadata();
+    collectionMetadata.setDocumentKeys(documentKeys);
+    collectionMetadata.setDocumentTypes(documentTypes);
+    return collectionMetadata;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (elasticsearchClient != null) {
+      this.elasticsearchClient.close();
+    }
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java
new file mode 100644
index 0000000..66f575f
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/store/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains the core classes of the Elasticsearch datastore.
+ */
+package org.apache.gora.elasticsearch.store;
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
new file mode 100644
index 0000000..236b306
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/AuthenticationType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+/**
+ * Authentication type to connect to the Elasticsearch server.
+ */
+public enum AuthenticationType {
+  /**
+   * Basic authentication requires to provide a username and password.
+   */
+  BASIC,
+  /**
+   * Token authentication requires to provide an Elasticsearch access token.
+   */
+  TOKEN,
+  /**
+   * API Key authentication requires to provide an Elasticsearch API Key ID and Elasticsearch API Key Secret.
+   */
+  APIKEY
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
new file mode 100644
index 0000000..4f0ca4a
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchConstants.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+/**
+ * Constants file for Elasticsearch.
+ */
+public class ElasticsearchConstants {
+  /**
+   * Property indicating if the hadoop configuration has priority or not.
+   */
+  public static final String PROP_OVERRIDING = "gora.elasticsearch.override.hadoop.configuration";
+
+  /**
+   * Default configurations for Elasticsearch.
+   */
+  public static final String DEFAULT_HOST = "localhost";
+  public static final int DEFAULT_PORT = 9200;
+
+  /**
+   * List of keys used in the configuration file of Elasticsearch.
+   */
+  public static final String PROP_HOST = "gora.datastore.elasticsearch.host";
+  public static final String PROP_PORT = "gora.datastore.elasticsearch.port";
+  public static final String PROP_SCHEME = "gora.datastore.elasticsearch.scheme";
+  public static final String PROP_AUTHENTICATIONTYPE = "gora.datastore.elasticsearch.authenticationType";
+  public static final String PROP_USERNAME = "gora.datastore.elasticsearch.username";
+  public static final String PROP_PASSWORD = "gora.datastore.elasticsearch.password";
+  public static final String PROP_AUTHORIZATIONTOKEN = "gora.datastore.elasticsearch.authorizationToken";
+  public static final String PROP_APIKEYID = "gora.datastore.elasticsearch.apiKeyId";
+  public static final String PROP_APIKEYSECRET = "gora.datastore.elasticsearch.apiKeySecret";
+  public static final String PROP_CONNECTTIMEOUT = "gora.datastore.elasticsearch.connectTimeout";
+  public static final String PROP_SOCKETTIMEOUT = "gora.datastore.elasticsearch.socketTimeout";
+  public static final String PROP_IOTHREADCOUNT = "gora.datastore.elasticsearch.ioThreadCount";
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
new file mode 100644
index 0000000..187b02a
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/ElasticsearchParameters.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.utils;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+/**
+ * Parameters definitions for Elasticsearch.
+ */
+public class ElasticsearchParameters {
+
+  /**
+   * Elasticsearch server host.
+   */
+  private String host;
+
+  /**
+   * Elasticsearch server port.
+   */
+  private int port;
+
+  /**
+   * Elasticsearch server scheme.
+   * Optional. If not provided, defaults to http.
+   */
+  private String scheme;
+
+  /**
+   * Authentication type to connect to the server.
+   * Can be BASIC, TOKEN or APIKEY.
+   */
+  private AuthenticationType authenticationType;
+
+  /**
+   * Username to use for server authentication.
+   * Required for BASIC authentication to connect to the server.
+   */
+  private String username;
+
+  /**
+   * Password to use for server authentication.
+   * Required for BASIC authentication to connect to the server.
+   */
+  private String password;
+
+  /**
+   * Authorization token to use for server authentication.
+   * Required for TOKEN authentication to connect to the server.
+   */
+  private String authorizationToken;
+
+  /**
+   * API Key ID to use for server authentication.
+   * Required for APIKEY authentication to connect to the server.
+   */
+  private String apiKeyId;
+
+  /**
+   * API Key Secret to use for server authentication.
+   * Required for APIKEY authentication to connect to the server.
+   */
+  private String apiKeySecret;
+
+  /**
+   * Timeout in milliseconds used for establishing the connection to the server.
+   * Optional. If not provided, defaults to 5000s.
+   */
+  private int connectTimeout;
+
+  /**
+   * Timeout in milliseconds used for waiting for data – after establishing the connection to the server.
+   * Optional. If not provided, defaults to 60000s.
+   */
+  private int socketTimeout;
+
+  /**
+   * Number of worker threads used by the connection manager.
+   * Optional. If not provided, defaults to 1.
+   */
+  private int ioThreadCount;
+
+  public ElasticsearchParameters(String host, int port) {
+    this.host = host;
+    this.port = port;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public void setHost(String host) {
+    this.host = host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
+  public void setPort(int port) {
+    this.port = port;
+  }
+
+  public String getScheme() {
+    return scheme;
+  }
+
+  public void setScheme(String scheme) {
+    this.scheme = scheme;
+  }
+
+  public AuthenticationType getAuthenticationType() {
+    return authenticationType;
+  }
+
+  public void setAuthenticationType(AuthenticationType authenticationType) {
+    this.authenticationType = authenticationType;
+  }
+
+  public String getUsername() {
+    return username;
+  }
+
+  public void setUsername(String username) {
+    this.username = username;
+  }
+
+  public String getPassword() {
+    return password;
+  }
+
+  public void setPassword(String password) {
+    this.password = password;
+  }
+
+  public String getAuthorizationToken() {
+    return authorizationToken;
+  }
+
+  public void setAuthorizationToken(String authorizationToken) {
+    this.authorizationToken = authorizationToken;
+  }
+
+  public String getApiKeyId() {
+    return apiKeyId;
+  }
+
+  public void setApiKeyId(String apiKeyId) {
+    this.apiKeyId = apiKeyId;
+  }
+
+  public String getApiKeySecret() {
+    return apiKeySecret;
+  }
+
+  public void setApiKeySecret(String apiKeySecret) {
+    this.apiKeySecret = apiKeySecret;
+  }
+
+  public int getConnectTimeout() {
+    return connectTimeout;
+  }
+
+  public void setConnectTimeout(int connectTimeout) {
+    this.connectTimeout = connectTimeout;
+  }
+
+  public int getSocketTimeout() {
+    return socketTimeout;
+  }
+
+  public void setSocketTimeout(int socketTimeout) {
+    this.socketTimeout = socketTimeout;
+  }
+
+  public int getIoThreadCount() {
+    return ioThreadCount;
+  }
+
+  public void setIoThreadCount(int ioThreadCount) {
+    this.ioThreadCount = ioThreadCount;
+  }
+
+  /**
+   * Reads Elasticsearch parameters from a properties list.
+   *
+   * @param properties Properties list
+   * @return Elasticsearch parameters instance
+   */
+  public static ElasticsearchParameters load(Properties properties, Configuration conf) {
+    ElasticsearchParameters elasticsearchParameters;
+
+    if (!Boolean.parseBoolean(properties.getProperty(ElasticsearchConstants.PROP_OVERRIDING))) {
+      elasticsearchParameters = new ElasticsearchParameters(
+              conf.get(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+              conf.getInt(ElasticsearchConstants.PROP_PORT, ElasticsearchConstants.DEFAULT_PORT));
+    } else {
+      elasticsearchParameters = new ElasticsearchParameters(
+              properties.getProperty(ElasticsearchConstants.PROP_HOST, ElasticsearchConstants.DEFAULT_HOST),
+              Integer.parseInt(properties.getProperty(ElasticsearchConstants.PROP_PORT,
+                      String.valueOf(ElasticsearchConstants.DEFAULT_PORT))));
+    }
+
+    String schemeProperty = properties.getProperty(ElasticsearchConstants.PROP_SCHEME);
+    if (schemeProperty != null) {
+      elasticsearchParameters.setScheme(schemeProperty);
+    }
+
+    AuthenticationType authenticationTypeProperty =
+            AuthenticationType.valueOf(properties.getProperty(ElasticsearchConstants.PROP_AUTHENTICATIONTYPE));
+    if (authenticationTypeProperty != null) {
+      elasticsearchParameters.setAuthenticationType(authenticationTypeProperty);
+    }
+
+    String usernameProperty = properties.getProperty(ElasticsearchConstants.PROP_USERNAME);
+    if (usernameProperty != null) {
+      elasticsearchParameters.setUsername(usernameProperty);
+    }
+
+    String passwordProperty = properties.getProperty(ElasticsearchConstants.PROP_PASSWORD);
+    if (passwordProperty != null) {
+      elasticsearchParameters.setPassword(passwordProperty);
+    }
+
+    String authorizationTokenProperty = properties.getProperty(ElasticsearchConstants.PROP_AUTHORIZATIONTOKEN);
+    if (authorizationTokenProperty != null) {
+      elasticsearchParameters.setAuthorizationToken(authorizationTokenProperty);
+    }
+
+    String apiKeyIdProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYID);
+    if (apiKeyIdProperty != null) {
+      elasticsearchParameters.setApiKeyId(apiKeyIdProperty);
+    }
+
+    String apiKeySecretProperty = properties.getProperty(ElasticsearchConstants.PROP_APIKEYSECRET);
+    if (apiKeySecretProperty != null) {
+      elasticsearchParameters.setApiKeySecret(apiKeySecretProperty);
+    }
+
+    String connectTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_CONNECTTIMEOUT);
+    if (connectTimeoutProperty != null) {
+      elasticsearchParameters.setConnectTimeout(Integer.parseInt(connectTimeoutProperty));
+    }
+
+    String socketTimeoutProperty = properties.getProperty(ElasticsearchConstants.PROP_SOCKETTIMEOUT);
+    if (socketTimeoutProperty != null) {
+      elasticsearchParameters.setSocketTimeout(Integer.parseInt(socketTimeoutProperty));
+    }
+
+    String ioThreadCountProperty = properties.getProperty(ElasticsearchConstants.PROP_IOTHREADCOUNT);
+    if (ioThreadCountProperty != null) {
+      elasticsearchParameters.setIoThreadCount(Integer.parseInt(ioThreadCountProperty));
+    }
+
+    return elasticsearchParameters;
+  }
+}
diff --git a/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java
new file mode 100644
index 0000000..49fcb93
--- /dev/null
+++ b/gora-elasticsearch/src/main/java/org/apache/gora/elasticsearch/utils/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains utility classes.
+ */
+package org.apache.gora.elasticsearch.utils;
diff --git a/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd b/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd
new file mode 100644
index 0000000..0c7c011
--- /dev/null
+++ b/gora-elasticsearch/src/main/resources/gora-elasticsearch.xsd
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
+    <xs:element name="gora-otd" type="elasticsearch-mapping"/>
+
+    <xs:complexType name="elasticsearch-mapping">
+        <xs:sequence>
+            <xs:element name="class" type="class-mapping" maxOccurs="unbounded"/>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="class-mapping">
+        <xs:sequence>
+            <xs:element name="field" type="field-mapping" maxOccurs="unbounded"/>
+        </xs:sequence>
+        <xs:attribute name="name" type="nameClass-types" use="required"/>
+        <xs:attribute name="keyClass" type="keyClass-types" use="required"/>
+        <xs:attribute name="index" type="xs:string" use="required"/>
+    </xs:complexType>
+
+    <xs:complexType name="field-mapping">
+        <xs:attribute name="name" type="fieldName-types" use="required"/>
+        <xs:attribute name="docfield" type="xs:string" use="required"/>
+        <xs:attribute name="type" type="xs:string" use="required"/>
+        <xs:attribute name="scalingFactor" type="xs:string" use="optional"/>
+    </xs:complexType>
+
+    <xs:simpleType name="keyClass-types">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="java.lang.String"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:simpleType name="nameClass-types">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="([\p{L}_$][\p{L}\p{N}_$]*\.)*[\p{L}_$][\p{L}\p{N}_$]*"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+    <xs:simpleType name="fieldName-types">
+        <xs:restriction base="xs:string">
+            <xs:pattern value="[a-zA-Z][a-zA-Z0-9]*"/>
+        </xs:restriction>
+    </xs:simpleType>
+
+</xs:schema>
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
new file mode 100644
index 0000000..6d4d4c3
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/GoraElasticsearchTestDriver.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch;
+
+import org.apache.gora.GoraTestDriver;
+import org.apache.gora.elasticsearch.store.ElasticsearchStore;
+import org.apache.gora.elasticsearch.utils.ElasticsearchConstants;
+import org.apache.gora.store.DataStoreFactory;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.util.Properties;
+
+/**
+ * Helper class for third part tests using gora-elasticsearch backend.
+ *
+ * @see GoraTestDriver
+ */
+public class GoraElasticsearchTestDriver extends GoraTestDriver {
+
+  private static final String DOCKER_IMAGE = "docker.elastic.co/elasticsearch/elasticsearch:7.10.1";
+  private ElasticsearchContainer elasticsearchContainer;
+
+  /**
+   * Constructor for this class.
+   */
+  public GoraElasticsearchTestDriver() {
+    super(ElasticsearchStore.class);
+    Properties properties = DataStoreFactory.createProps();
+    elasticsearchContainer = new ElasticsearchContainer(DOCKER_IMAGE)
+            .withEnv("ELASTIC_PASSWORD", properties.getProperty(ElasticsearchConstants.PROP_PASSWORD))
+            .withEnv("xpack.security.enabled", "true");
+  }
+
+  /**
+   * Initiate the Elasticsearch server on the default port.
+   */
+  @Override
+  public void setUpClass() throws Exception {
+    elasticsearchContainer.start();
+    log.info("Setting up Elasticsearch test driver");
+
+    int port = elasticsearchContainer.getMappedPort(ElasticsearchConstants.DEFAULT_PORT);
+    String host = elasticsearchContainer.getContainerIpAddress();
+    conf.set(ElasticsearchConstants.PROP_PORT, String.valueOf(port));
+    conf.set(ElasticsearchConstants.PROP_HOST, host);
+  }
+
+  /**
+   * Tear the server down.
+   */
+  @Override
+  public void tearDownClass() throws Exception {
+    elasticsearchContainer.close();
+    log.info("Tearing down Elasticsearch test driver");
+  }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
new file mode 100644
index 0000000..c05c441
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/ElasticsearchStoreMapReduceTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.mapreduce;
+
+import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver;
+import org.apache.gora.examples.generated.WebPage;
+import org.apache.gora.mapreduce.DataStoreMapReduceTestBase;
+import org.apache.gora.mapreduce.MapReduceTestUtils;
+import org.apache.gora.store.DataStore;
+import org.apache.gora.store.DataStoreFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Executes tests for MR jobs over Elasticsearch dataStore.
+ */
+public class ElasticsearchStoreMapReduceTest extends DataStoreMapReduceTestBase {
+
+  private GoraElasticsearchTestDriver driver;
+
+  public ElasticsearchStoreMapReduceTest() throws IOException {
+    super();
+    driver = new GoraElasticsearchTestDriver();
+  }
+
+  @Override
+  @Before
+  public void setUp() throws Exception {
+    driver.setUpClass();
+    super.setUp();
+  }
+
+  @Override
+  @After
+  public void tearDown() throws Exception {
+    super.tearDown();
+    driver.tearDownClass();
+  }
+
+  @Override
+  protected DataStore<String, WebPage> createWebPageDataStore() throws IOException {
+    return DataStoreFactory.getDataStore(String.class, WebPage.class, driver.getConfiguration());
+  }
+
+  @Test
+  @Override
+  public void testCountQuery() throws Exception {
+    MapReduceTestUtils.testCountQuery(createWebPageDataStore(), driver.getConfiguration());
+  }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java
new file mode 100644
index 0000000..781ce61
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/mapreduce/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains map reduce tests.
+ */
+package org.apache.gora.elasticsearch.mapreduce;
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java
new file mode 100644
index 0000000..406f829
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Elasticsearch datastore test utilities.
+ */
+package org.apache.gora.elasticsearch;
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
new file mode 100644
index 0000000..fb7fd0d
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/TestElasticsearchStore.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gora.elasticsearch.store;
+
+import org.apache.gora.elasticsearch.GoraElasticsearchTestDriver;
+import org.apache.gora.elasticsearch.mapping.ElasticsearchMapping;
+import org.apache.gora.elasticsearch.mapping.Field;
+import org.apache.gora.elasticsearch.utils.AuthenticationType;
+import org.apache.gora.elasticsearch.utils.ElasticsearchParameters;
+import org.apache.gora.examples.generated.EmployeeInt;
+import org.apache.gora.store.DataStoreFactory;
+import org.apache.gora.store.DataStoreMetadataFactory;
+import org.apache.gora.store.DataStoreTestBase;
+import org.apache.gora.store.impl.DataStoreMetadataAnalyzer;
+import org.apache.gora.util.GoraException;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Test case for ElasticsearchStore.
+ */
+public class TestElasticsearchStore extends DataStoreTestBase {
+
+  static {
+    setTestDriver(new GoraElasticsearchTestDriver());
+  }
+
+  @Test
+  public void testInitialize() throws GoraException {
+    log.info("test method: testInitialize");
+
+    ElasticsearchMapping mapping = ((ElasticsearchStore) employeeStore).getMapping();
+
+    Map<String, Field> fields = new HashMap<String, Field>() {{
+      put("name", new Field("name", new Field.FieldType(Field.DataType.TEXT)));
+      put("dateOfBirth", new Field("dateOfBirth", new Field.FieldType(Field.DataType.LONG)));
+      put("ssn", new Field("ssn", new Field.FieldType(Field.DataType.TEXT)));
+      put("value", new Field("value", new Field.FieldType(Field.DataType.TEXT)));
+      put("salary", new Field("salary", new Field.FieldType(Field.DataType.INTEGER)));
+      put("boss", new Field("boss", new Field.FieldType(Field.DataType.OBJECT)));
+      put("webpage", new Field("webpage", new Field.FieldType(Field.DataType.OBJECT)));
+    }};
+
+    Assert.assertEquals("frontier", employeeStore.getSchemaName());
+    Assert.assertEquals("frontier", mapping.getIndexName());
+    Assert.assertEquals(fields, mapping.getFields());
+  }
+
+  @Test
+  public void testLoadElasticsearchParameters() throws IOException {
+    log.info("test method: testLoadElasticsearchParameters");
+
+    Properties properties = DataStoreFactory.createProps();
+
+    ElasticsearchParameters parameters = ElasticsearchParameters.load(properties, testDriver.getConfiguration());
+
+    Assert.assertEquals("localhost", parameters.getHost());
+    Assert.assertEquals(AuthenticationType.BASIC, parameters.getAuthenticationType());
+    Assert.assertEquals("elastic", parameters.getUsername());
+    Assert.assertEquals("password", parameters.getPassword());
+  }
+
+  @Test(expected = GoraException.class)
+  public void testInvalidXmlFile() throws Exception {
+    log.info("test method: testInvalidXmlFile");
+
+    Properties properties = DataStoreFactory.createProps();
+    properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+    properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "true");
+    testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+  }
+
+  @Test
+  public void testXsdValidationParameter() throws GoraException {
+    log.info("test method: testXsdValidationParameter");
+
+    Properties properties = DataStoreFactory.createProps();
+    properties.setProperty(ElasticsearchStore.PARSE_MAPPING_FILE_KEY, "gora-elasticsearch-mapping-invalid.xml");
+    properties.setProperty(ElasticsearchStore.XSD_VALIDATION, "false");
+    testDriver.createDataStore(String.class, EmployeeInt.class, properties);
+  }
+
+  @Test
+  public void testGetType() throws GoraException, ClassNotFoundException {
+    Configuration conf = testDriver.getConfiguration();
+    DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+    String actualType = storeMetadataAnalyzer.getType();
+    String expectedType = "ELASTICSEARCH";
+    Assert.assertEquals(expectedType, actualType);
+  }
+
+  @Test
+  public void testGetTablesNames() throws GoraException, ClassNotFoundException {
+    Configuration conf = testDriver.getConfiguration();
+    DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+    List<String> actualTablesNames = new ArrayList<>(storeMetadataAnalyzer.getTablesNames());
+    List<String> expectedTablesNames = new ArrayList<String>() {
+      {
+        add("frontier");
+        add("webpage");
+      }
+    };
+    Assert.assertEquals(expectedTablesNames, actualTablesNames);
+  }
+
+  @Test
+  public void testGetTableInfo() throws GoraException, ClassNotFoundException {
+    Configuration conf = testDriver.getConfiguration();
+    DataStoreMetadataAnalyzer storeMetadataAnalyzer = DataStoreMetadataFactory.createAnalyzer(conf);
+
+    ElasticsearchStoreCollectionMetadata actualCollectionMetadata =
+            (ElasticsearchStoreCollectionMetadata) storeMetadataAnalyzer.getTableInfo("frontier");
+
+    List<String> expectedDocumentKeys = new ArrayList<String>() {
+      {
+        add("name");
+        add("dateOfBirth");
+        add("ssn");
+        add("value");
+        add("salary");
+        add("boss");
+        add("webpage");
+        add("gora_id");
+      }
+    };
+
+    List<String> expectedDocumentTypes = new ArrayList<String>() {
+      {
+        add("text");
+        add("long");
+        add("text");
+        add("text");
+        add("integer");
+        add("object");
+        add("object");
+        add("keyword");
+      }
+    };
+
+    Assert.assertEquals(expectedDocumentKeys.size(), actualCollectionMetadata.getDocumentTypes().size());
+    Assert.assertTrue(expectedDocumentKeys.containsAll(actualCollectionMetadata.getDocumentKeys()));
+
+    Assert.assertEquals(expectedDocumentTypes.size(), actualCollectionMetadata.getDocumentTypes().size());
+    Assert.assertTrue(expectedDocumentTypes.containsAll(actualCollectionMetadata.getDocumentTypes()));
+  }
+
+  @Ignore("Elasticsearch doesn't support 3 types union field yet")
+  @Override
+  public void testGet3UnionField() {
+  }
+}
diff --git a/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java
new file mode 100644
index 0000000..cd77199
--- /dev/null
+++ b/gora-elasticsearch/src/test/java/org/apache/gora/elasticsearch/store/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains Elasticsearch datastore tests.
+ */
+package org.apache.gora.elasticsearch.store;
diff --git a/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml
new file mode 100644
index 0000000..c1a8ccd
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping-invalid.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<gora-otd xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:noNamespaceSchemaLocation="gora-elasticsearch.xsd">
+
+    <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" index="frontier">
+        <!--
+            Remove type to test XSD validation.
+            <field name="name" docfield="name" type="text"/>
+        -->
+        <field name="name" docfield="name"/>
+    </class>
+
+</gora-otd>
diff --git a/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml
new file mode 100644
index 0000000..13a8d08
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora-elasticsearch-mapping.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<gora-otd xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:noNamespaceSchemaLocation="gora-elasticsearch.xsd">
+
+    <class name="org.apache.gora.examples.generated.Employee" keyClass="java.lang.String" index="frontier">
+        <field name="name" docfield="name" type="text"/>
+        <field name="dateOfBirth" docfield="dateOfBirth" type="long"/>
+        <field name="ssn" docfield="ssn" type="text"/>
+        <field name="value" docfield="value" type="text"/>
+        <field name="salary" docfield="salary" type="integer"/>
+        <field name="boss" docfield="boss" type="object"/>
+        <field name="webpage" docfield="webpage" type="object"/>
+    </class>
+
+    <class name="org.apache.gora.examples.generated.WebPage" keyClass="java.lang.String" index="webpage">
+        <field name="url" docfield="url" type="text"/>
+        <field name="content" docfield="content" type="binary"/>
+        <field name="parsedContent" docfield="pContent" type="text"/>
+        <field name="outlinks" docfield="links.out" type="object"/>
+        <field name="headers" docfield="headers" type="object"/>
+        <field name="metadata" docfield="metadata" type="object"/>
+        <field name="byteData" docfield="byteData" type="object"/>
+        <field name="stringData" docfield="stringData" type="object"/>
+    </class>
+
+    <class name="org.apache.gora.examples.generated.TokenDatum" keyClass="java.lang.String" index="TokenDatum">
+        <field name="count" docfield="count" type="integer"/>
+    </class>
+
+</gora-otd>
diff --git a/gora-elasticsearch/src/test/resources/gora.properties b/gora-elasticsearch/src/test/resources/gora.properties
new file mode 100644
index 0000000..99842a9
--- /dev/null
+++ b/gora-elasticsearch/src/test/resources/gora.properties
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+gora.datastore.default=org.apache.gora.elasticsearch.store.ElasticsearchStore
+gora.elasticsearch.override.hadoop.configuration=false
+gora.datastore.elasticsearch.host=localhost
+gora.datastore.elasticsearch.port=9200
+#gora.datastore.elasticsearch.scheme=
+
+#Basic authentication requires username and password.
+gora.datastore.elasticsearch.authenticationType=BASIC
+#Default username to connect to the Elasticsearch Docker container.
+gora.datastore.elasticsearch.username=elastic
+gora.datastore.elasticsearch.password=password
+
+#Authentication with an Elasticsearch access token.
+#gora.datastore.elasticsearch.authenticationType=TOKEN
+#gora.datastore.elasticsearch.authorizationToken=
+
+#Authentication with an Elasticsearch API Key requires API Key ID and API Key Secret.
+#gora.datastore.elasticsearch.authenticationType=APIKEY
+#gora.datastore.elasticsearch.apiKeyId=
+#gora.datastore.elasticsearch.apiKeySecret=
+
+#Additional configuration parameters for timeouts and threads.
+#gora.datastore.elasticsearch.connectTimeout=
+#gora.datastore.elasticsearch.socketTimeout=
+#gora.datastore.elasticsearch.ioThreadCount=
diff --git a/pom.xml b/pom.xml
index f96c2ea..590dcfc 100755
--- a/pom.xml
+++ b/pom.xml
@@ -816,6 +816,7 @@
     <module>gora-redis</module>
     <module>gora-jet</module>
     <module>gora-rethinkdb</module>
+    <module>gora-elasticsearch</module>
     <module>gora-tutorial</module>
     <module>gora-benchmark</module>
     <module>sources-dist</module>
@@ -839,7 +840,7 @@
     <cassandra-driver.version>3.9.0</cassandra-driver.version>
     <cassandra.version>3.11.0</cassandra.version>
     <!-- Ignite Dependencies -->
-    <ignite.version>2.9.0</ignite.version>
+    <ignite.version>2.10.0</ignite.version>
     <sqlbuilder.version>2.1.7</sqlbuilder.version>
     <!-- Redis Dependencies -->
     <redisson.version>3.11.0</redisson.version>
@@ -855,9 +856,11 @@
     <restlet.version>2.4.0</restlet.version>
     <!-- Flink Dependencies -->
     <flink.version>1.6.2</flink.version>
+    <!-- Elasticsearch Dependencies -->
+    <elasticsearch.version>7.10.1</elasticsearch.version>
 
     <spark.version>2.2.1</spark.version>
-    <aerospike.version>4.2.2</aerospike.version>
+    <aerospike.version>5.0.6</aerospike.version>
     <!-- Misc Dependencies -->
     <guava.version>13.0</guava.version>
     <commons-lang.version>2.6</commons-lang.version>
@@ -1201,6 +1204,13 @@
         <version>${redisson.version}</version>
       </dependency>
 
+      <!-- Elasticsearch dependencies -->
+      <dependency>
+        <groupId>org.elasticsearch.client</groupId>
+        <artifactId>elasticsearch-rest-high-level-client</artifactId>
+        <version>${elasticsearch.version}</version>
+      </dependency>
+
       <!--Hadoop dependencies -->
       <dependency>
         <groupId>org.apache.hadoop</groupId>