Merge remote-tracking branch 'arjansh/bug/fieldnames-with-dots-in-elasticsearch'
diff --git a/CHANGES.md b/CHANGES.md
index 41fb5bd..67fcf4c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,5 +1,6 @@
 ### Apache MetaModel [WIP]
 
+ * [METAMODEL-1228] - Better handling of fieldnames with dots in Elasticsearch
  * [METAMODEL-1227] - Better handling of nested objects in Elasticsearch data
  * [METAMODEL-1224] - Ensured compatibility with newer versions of PostgreSQL
 
diff --git a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
index 6ab4fa6..f5c70e8 100644
--- a/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
+++ b/elasticsearch/common/src/main/java/org/apache/metamodel/elasticsearch/common/ElasticSearchUtils.java
@@ -290,6 +290,23 @@
                         } else {
                             values[i] = valueToDate;
                         }
+                    } else if (column.getType() == ColumnType.MAP && value == null) {
+                        // Because of a bug in Elasticsearch, when field names contain dots, it's possible that the
+                        // mapping of the index described a column to be of the type "MAP", while it's based on a number
+                        // of fields containing dots in their name. In this case we may have to work around that
+                        // inconsistency by creating column names with dots ourselves, based on the schema.
+                        final Map<String, Object> valueMap = new HashMap<>();
+
+                        sourceMap
+                                .keySet()
+                                .stream()
+                                .filter(fieldName -> fieldName.startsWith(column.getName() + "."))
+                                .forEach(fieldName -> evaluateField(sourceMap, valueMap, fieldName, fieldName
+                                        .substring(fieldName.indexOf('.') + 1)));
+
+                        if (!valueMap.isEmpty()) {
+                            values[i] = valueMap;
+                        }
                     } else {
                         values[i] = value;
                     }
@@ -299,4 +316,26 @@
 
         return new DefaultRow(header, values);
     }
+
+    private static void evaluateField(final Map<String, Object> sourceMap, final Map<String, Object> valueMap,
+            final String sourceFieldName, final String subFieldName) {
+        if (subFieldName.contains(".")) {
+            @SuppressWarnings("unchecked")
+            final Map<String, Object> nestedValueMap = (Map<String, Object>) valueMap
+                    .computeIfAbsent(subFieldName.substring(0, subFieldName.indexOf('.')), key -> createNestedValueMap(
+                            valueMap, key));
+
+            evaluateField(sourceMap, nestedValueMap, sourceFieldName, subFieldName
+                    .substring(subFieldName.indexOf('.') + 1));
+        } else {
+            valueMap.put(subFieldName, sourceMap.get(sourceFieldName));
+        }
+    }
+
+    private static Object createNestedValueMap(final Map<String, Object> valueMap, final String nestedFieldName) {
+        final Map<String, Object> nestedValueMap = new HashMap<>();
+        valueMap.put(nestedFieldName, nestedValueMap);
+
+        return nestedValueMap;
+    }
 }
diff --git a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestNestedDataIT.java b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestNestedDataIT.java
index 0035679..94f078e 100644
--- a/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestNestedDataIT.java
+++ b/elasticsearch/rest/src/test/java/org/apache/metamodel/elasticsearch/rest/ElasticSearchRestNestedDataIT.java
@@ -38,6 +38,7 @@
 import org.elasticsearch.client.RequestOptions;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.elasticsearch.client.indices.CreateIndexRequest;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,9 +68,13 @@
 
     @Test
     public void testNestedData() throws Exception {
+        final Map<String, Object> address = new HashMap<>();
+        address.put("street", "Main street 1");
+        address.put("city", "Newville");
+
         final Map<String, Object> user = new HashMap<>();
         user.put("fullname", "John Doe");
-        user.put("address", "Main street 1, Newville");
+        user.put("address", address);
 
         final Map<String, Object> userMessage = new LinkedHashMap<>();
         userMessage.put("user", user);
@@ -80,6 +85,26 @@
 
         client.index(indexRequest, RequestOptions.DEFAULT);
 
+        validateSchemaAndResults();
+    }
+
+    @Test
+    public void testIndexOfDocumentWithDots() throws Exception {
+        final String document =
+                "{ \"user.fullname\": \"John Doe\", "
+                + "\"user.address.street\": \"Main street 1\", "
+                + "\"user.address.city\": \"Newville\", "
+                + "\"message\": \"This is what I have to say.\" }";
+
+        final IndexRequest indexRequest = new IndexRequest(INDEX_NAME).id("1");
+        indexRequest.source(document, XContentType.JSON);
+
+        client.index(indexRequest, RequestOptions.DEFAULT);
+
+        validateSchemaAndResults();
+    }
+
+    private void validateSchemaAndResults() {
         final Table table = dataContext.getDefaultSchema().getTableByName(DEFAULT_TABLE_NAME);
 
         assertThat(table.getColumnNames(), containsInAnyOrder("_id", "message", "user"));
@@ -107,7 +132,14 @@
             @SuppressWarnings("rawtypes")
             final Map userValueMap = (Map) userValue;
             assertEquals("John Doe", userValueMap.get("fullname"));
-            assertEquals("Main street 1, Newville", userValueMap.get("address"));
+
+            final Object addressValue = userValueMap.get("address");
+            assertTrue(addressValue instanceof Map);
+
+            @SuppressWarnings("rawtypes")
+            final Map addressValueMap = (Map) addressValue;
+            assertEquals("Main street 1", addressValueMap.get("street"));
+            assertEquals("Newville", addressValueMap.get("city"));
         }
     }
 }