Add RethinkDB query execution
diff --git a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBQuery.java b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBQuery.java
index e705f73..4eb8d6f 100644
--- a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBQuery.java
+++ b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBQuery.java
@@ -18,16 +18,22 @@
 
 package org.apache.gora.rethinkdb.query;
 
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.gen.ast.ReqlExpr;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.rethinkdb.store.RethinkDBMapping;
+import org.apache.gora.rethinkdb.store.RethinkDBStoreParameters;
 import org.apache.gora.store.DataStore;
 
-
 /**
  * RethinkDB specific implementation of the {@link org.apache.gora.query.Query} interface.
  */
 public class RethinkDBQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
 
+  public static final RethinkDB r = RethinkDB.r;
+  private ReqlExpr dbQuery;
+
   public RethinkDBQuery() {
     super(null);
   }
@@ -36,4 +42,58 @@
     super(dataStore);
   }
 
+  public Object populateRethinkDBQuery(final RethinkDBMapping rethinkDBMapping,
+                                       final RethinkDBStoreParameters rethinkDBStoreParameters,
+                                       final String[] fields,
+                                       final String[] schemaFields) {
+    if ((this.getStartKey() != null) && (this.getEndKey() != null)
+            && this.getStartKey().equals(this.getEndKey())) {
+      dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+              .table(rethinkDBMapping.getDocumentClass())
+              .filter(row -> row.g("id").eq(this.getStartKey()));
+    } else if (this.getStartKey() != null || this.getEndKey() != null) {
+      if (this.getStartKey() != null
+              && this.getEndKey() == null) {
+        dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+                .table(rethinkDBMapping.getDocumentClass())
+                .filter(row -> row.g("id").ge(this.getStartKey()));
+      } else if (this.getEndKey() != null
+              && this.getStartKey() == null) {
+        dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+                .table(rethinkDBMapping.getDocumentClass())
+                .filter(row -> row.g("id").le(this.getEndKey()));
+      } else {
+        dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+                .table(rethinkDBMapping.getDocumentClass())
+                .filter(row -> row.g("id").ge(this.getStartKey()).
+                        and(row.g("id").le(this.getEndKey())));
+      }
+    } else {
+      dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+              .table(rethinkDBMapping.getDocumentClass());
+    }
+
+    if (fields.length == schemaFields.length) {
+      // all
+    } else {
+      String[] projection = new String[fields.length + 1];
+      int counter = 0;
+      for (String k : fields) {
+        String dbFieldName = rethinkDBMapping.getDocumentField(k);
+        if (dbFieldName != null && dbFieldName.length() > 0) {
+          projection[counter] = dbFieldName;
+          counter++;
+        }
+      }
+      projection[counter] = "id";
+      dbQuery = dbQuery.pluck(projection);
+    }
+
+    return dbQuery;
+  }
+
+  public ReqlExpr getRethinkDBDbQuery() {
+    return dbQuery;
+  }
+
 }
diff --git a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBResult.java b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBResult.java
index 21bcab1..e5ddbdc 100644
--- a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBResult.java
+++ b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBResult.java
@@ -19,7 +19,9 @@
 package org.apache.gora.rethinkdb.query;
 
 import java.io.IOException;
+import java.util.Iterator;
 
+import com.rethinkdb.model.MapObject;
 import org.apache.gora.rethinkdb.store.RethinkDBStore;
 import org.apache.gora.persistency.impl.PersistentBase;
 import org.apache.gora.query.Query;
@@ -30,38 +32,65 @@
 
 /**
  * RethinkDB specific implementation of the {@link org.apache.gora.query.Result} interface.
- *
  */
 public class RethinkDBResult<K, T extends PersistentBase> extends ResultBase<K, T> {
 
+  private com.rethinkdb.net.Result<MapObject> resultSet;
   private int size;
+  private Iterator<MapObject> resultSetIterator;
   private static final Logger log = LoggerFactory.getLogger(RethinkDBResult.class);
 
   public RethinkDBResult(DataStore<K, T> dataStore, Query<K, T> query) {
     super(dataStore, query);
   }
 
+  public RethinkDBResult(DataStore<K, T> dataStore,
+                         Query<K, T> query,
+                         com.rethinkdb.net.Result<MapObject> resultSet) {
+    super(dataStore, query);
+    this.resultSet = resultSet;
+    this.resultSetIterator = resultSet.iterator();
+    this.size = resultSet.bufferedCount();
+  }
+
   public RethinkDBStore<K, T> getDataStore() {
     return (RethinkDBStore<K, T>) super.getDataStore();
   }
 
   @Override
   public float getProgress() throws IOException {
-    return 0L;
+    if (resultSet == null) {
+      return 0;
+    } else if (size == 0) {
+      return 1;
+    } else {
+      return offset / (float) size;
+    }
   }
 
   @Override
   public void close() throws IOException {
+    resultSet.close();
   }
 
   @Override
   protected boolean nextInner() throws IOException {
-    return true;
+    if (!resultSetIterator.hasNext()) {
+      return false;
+    }
+
+    MapObject<String, Object> obj = resultSetIterator.next();
+    key = (K) obj.get("id");
+    persistent = ((RethinkDBStore<K, T>) getDataStore())
+            .convertRethinkDBDocToAvroBean(obj, getQuery().getFields());
+    return persistent != null;
   }
 
   @Override
   public int size() {
-    return 0;
+    int totalSize = size;
+    int intLimit = (int) this.limit;
+    return intLimit > 0 && totalSize > intLimit ? intLimit : totalSize;
   }
 
 }
diff --git a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java
index cb5d81a..9a50704 100644
--- a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java
+++ b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java
@@ -24,11 +24,13 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.List;
 import java.util.ArrayList;
 
 import com.rethinkdb.RethinkDB;
+import com.rethinkdb.gen.ast.ReqlExpr;
 import com.rethinkdb.model.MapObject;
 import com.rethinkdb.net.Connection;
 import org.apache.avro.Schema;
@@ -42,6 +44,7 @@
 import org.apache.gora.query.Query;
 import org.apache.gora.query.Result;
 import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.rethinkdb.query.RethinkDBResult;
 import org.apache.gora.store.impl.DataStoreBase;
 import org.apache.gora.util.AvroUtils;
 import org.apache.gora.util.ClassLoadingUtils;
@@ -176,7 +179,7 @@
     try {
       boolean isExists = r.db(rethinkDBStoreParameters.getDatabaseName())
               .table(rethinkDBMapping.getDocumentClass())
-              .get(key)
+              .getAll(key)
               .count()
               .run(connection, Boolean.class)
               .first();
@@ -256,7 +259,43 @@
    */
   @Override
   public long deleteByQuery(Query<K, T> query) throws GoraException {
-    return 0L;
+    if (query.getFields() == null || (query.getFields().length == getFields().length)) {
+      String[] fields = getFieldsToQuery(query.getFields());
+      RethinkDBQuery dataStoreQuery = ((RethinkDBQuery) query);
+      dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters, fields, getFields());
+      ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+      MapObject<String, Object> document = reqlExpr.delete().run(connection, MapObject.class).first();
+      int deleteCount = Integer.valueOf(document.get("deleted").toString());
+      if (deleteCount > 0) {
+        return deleteCount;
+      } else {
+        return 0;
+      }
+    } else {
+      RethinkDBQuery<K, T> dataStoreQuery = new RethinkDBQuery<>(this);
+      dataStoreQuery.setStartKey(query.getStartKey());
+      dataStoreQuery.setEndKey(query.getEndKey());
+      dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters,
+              getFieldsToQuery(null), getFields());
+      ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+      String[] projection = new String[query.getFields().length];
+      int counter = 0;
+      for (String k : query.getFields()) {
+        String dbFieldName = rethinkDBMapping.getDocumentField(k);
+        if (dbFieldName != null && dbFieldName.length() > 0) {
+          projection[counter] = dbFieldName;
+          counter++;
+        }
+      }
+      MapObject<String, Object> document = reqlExpr.replace(row -> row.without(projection))
+              .run(connection, MapObject.class).first();
+      int replacedCount = Integer.valueOf(document.get("replaced").toString());
+      if (replacedCount > 0) {
+        return replacedCount;
+      } else {
+        return 0;
+      }
+    }
   }
 
   /**
@@ -264,8 +303,18 @@
    */
   @Override
   public Result<K, T> execute(Query<K, T> query) throws GoraException {
+    String[] fields = getFieldsToQuery(query.getFields());
+    RethinkDBQuery dataStoreQuery;
+    if (query instanceof RethinkDBQuery) {
+      dataStoreQuery = ((RethinkDBQuery) query);
+    } else {
+      dataStoreQuery = (RethinkDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery();
+    }
+    dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters, fields, getFields());
     try {
-      return null;
+      ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+      com.rethinkdb.net.Result<MapObject> result = reqlExpr.run(connection, MapObject.class);
+      return new RethinkDBResult<>(this, query, result);
     } catch (Exception e) {
       throw new GoraException(e);
     }
@@ -542,7 +591,8 @@
         result = convertDocFieldToAvroList(docf, fieldSchema, obj, field, storeType);
         break;
       case RECORD:
-        MapObject<String, Object> record = (MapObject<String, Object>) obj.get(docf);
+        MapObject<String, Object> record = (MapObject<String, Object>)
+                decorateMapToODoc((Map<String, Object>) obj.get(docf));
         if (record == null) {
           result = null;
           break;
@@ -655,7 +705,8 @@
                                            final Schema.Field f,
                                            final RethinkDBMapping.DocumentFieldType storeType) throws GoraException {
 
-    if (storeType == RethinkDBMapping.DocumentFieldType.LIST) {
+    if (storeType == RethinkDBMapping.DocumentFieldType.LIST
+            || storeType == null) {
       List<Object> list = (List<Object>) doc.get(docf);
       List<Object> rlist = new ArrayList<>();
       if (list == null) {
@@ -692,7 +743,8 @@
       }
       return new DirtyMapWrapper<>(rmap);
     } else {
-      MapObject<String, Object> innerDoc = (MapObject<String, Object>) doc.get(docf);
+      MapObject<String, Object> innerDoc = (MapObject<String, Object>)
+              decorateMapToODoc((Map<String, Object>) doc.get(docf));
       Map<Utf8, Object> rmap = new HashMap<>();
       if (innerDoc == null) {
         return new DirtyMapWrapper(rmap);
@@ -709,6 +761,9 @@
   }
 
   private MapObject<String, Object> decorateMapToODoc(Map<String, Object> map) {
+    if (Objects.isNull(map)) {
+      return null;
+    }
     MapObject<String, Object> doc = new MapObject();
     for (Map.Entry entry : map.entrySet()) {
       doc.put(entry.getKey().toString(), entry.getValue());
diff --git a/gora-rethinkdb/src/test/java/org/apache/gora/rethinkdb/store/RethinkDBGoraDataStoreTest.java b/gora-rethinkdb/src/test/java/org/apache/gora/rethinkdb/store/RethinkDBGoraDataStoreTest.java
index aecfdd3..0270da2 100644
--- a/gora-rethinkdb/src/test/java/org/apache/gora/rethinkdb/store/RethinkDBGoraDataStoreTest.java
+++ b/gora-rethinkdb/src/test/java/org/apache/gora/rethinkdb/store/RethinkDBGoraDataStoreTest.java
@@ -18,17 +18,14 @@
 
 package org.apache.gora.rethinkdb.store;
 
-
 import org.apache.gora.rethinkdb.RethinkDBTestDriver;
 import org.apache.gora.store.DataStoreTestBase;
-import org.apache.gora.store.DataStoreTestUtil;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Ignore;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.FixedHostPortGenericContainer;
@@ -72,13 +69,6 @@
     super.tearDown();
   }
 
-  @Override
-  @Test
-  public void testGet() throws Exception {
-    log.info("test method: testGet");
-    DataStoreTestUtil.testGetEmployee(this.employeeStore);
-  }
-
   @Ignore("3 types union field is not supported by OrientDBStore.")
   @Override
   public void testGet3UnionField() throws Exception {