Add RethinkDB query execution
diff --git a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBQuery.java b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBQuery.java
index e705f73..4eb8d6f 100644
--- a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBQuery.java
+++ b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBQuery.java
@@ -18,16 +18,22 @@
package org.apache.gora.rethinkdb.query;
+import com.rethinkdb.RethinkDB;
+import com.rethinkdb.gen.ast.ReqlExpr;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.impl.QueryBase;
+import org.apache.gora.rethinkdb.store.RethinkDBMapping;
+import org.apache.gora.rethinkdb.store.RethinkDBStoreParameters;
import org.apache.gora.store.DataStore;
-
/**
* RethinkDB specific implementation of the {@link org.apache.gora.query.Query} interface.
*/
public class RethinkDBQuery<K, T extends PersistentBase> extends QueryBase<K, T> {
+ public static final RethinkDB r = RethinkDB.r;
+ private ReqlExpr dbQuery;
+
public RethinkDBQuery() {
super(null);
}
@@ -36,4 +42,58 @@
super(dataStore);
}
+ public Object populateRethinkDBQuery(final RethinkDBMapping rethinkDBMapping,
+ final RethinkDBStoreParameters rethinkDBStoreParameters,
+ final String[] fields,
+ final String[] schemaFields) {
+ if ((this.getStartKey() != null) && (this.getEndKey() != null)
+ && this.getStartKey().equals(this.getEndKey())) {
+ dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+ .table(rethinkDBMapping.getDocumentClass())
+ .filter(row -> row.g("id").eq(this.getStartKey()));
+ } else if (this.getStartKey() != null || this.getEndKey() != null) {
+ if (this.getStartKey() != null
+ && this.getEndKey() == null) {
+ dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+ .table(rethinkDBMapping.getDocumentClass())
+ .filter(row -> row.g("id").ge(this.getStartKey()));
+ } else if (this.getEndKey() != null
+ && this.getStartKey() == null) {
+ dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+ .table(rethinkDBMapping.getDocumentClass())
+ .filter(row -> row.g("id").le(this.getEndKey()));
+ } else {
+ dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+ .table(rethinkDBMapping.getDocumentClass())
+ .filter(row -> row.g("id").ge(this.getStartKey()).
+ and(row.g("id").le(this.getEndKey())));
+ }
+ } else {
+ dbQuery = r.db(rethinkDBStoreParameters.getDatabaseName())
+ .table(rethinkDBMapping.getDocumentClass());
+ }
+
+ if (fields.length == schemaFields.length) {
+ // all
+ } else {
+ String[] projection = new String[fields.length + 1];
+ int counter = 0;
+ for (String k : fields) {
+ String dbFieldName = rethinkDBMapping.getDocumentField(k);
+ if (dbFieldName != null && dbFieldName.length() > 0) {
+ projection[counter] = dbFieldName;
+ counter++;
+ }
+ }
+ projection[counter] = "id";
+ dbQuery = dbQuery.pluck(projection);
+ }
+
+ return dbQuery;
+ }
+
+ public ReqlExpr getRethinkDBDbQuery() {
+ return dbQuery;
+ }
+
}
diff --git a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBResult.java b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBResult.java
index 21bcab1..e5ddbdc 100644
--- a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBResult.java
+++ b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/query/RethinkDBResult.java
@@ -19,7 +19,9 @@
package org.apache.gora.rethinkdb.query;
import java.io.IOException;
+import java.util.Iterator;
+import com.rethinkdb.model.MapObject;
import org.apache.gora.rethinkdb.store.RethinkDBStore;
import org.apache.gora.persistency.impl.PersistentBase;
import org.apache.gora.query.Query;
@@ -30,38 +32,65 @@
/**
* RethinkDB specific implementation of the {@link org.apache.gora.query.Result} interface.
- *
*/
public class RethinkDBResult<K, T extends PersistentBase> extends ResultBase<K, T> {
+ private com.rethinkdb.net.Result<MapObject> resultSet;
private int size;
+ private Iterator<MapObject> resultSetIterator;
private static final Logger log = LoggerFactory.getLogger(RethinkDBResult.class);
public RethinkDBResult(DataStore<K, T> dataStore, Query<K, T> query) {
super(dataStore, query);
}
+ public RethinkDBResult(DataStore<K, T> dataStore,
+ Query<K, T> query,
+ com.rethinkdb.net.Result<MapObject> resultSet) {
+ super(dataStore, query);
+ this.resultSet = resultSet;
+ this.resultSetIterator = resultSet.iterator();
+ this.size = resultSet.bufferedCount();
+ }
+
public RethinkDBStore<K, T> getDataStore() {
return (RethinkDBStore<K, T>) super.getDataStore();
}
@Override
public float getProgress() throws IOException {
- return 0L;
+ if (resultSet == null) {
+ return 0;
+ } else if (size == 0) {
+ return 1;
+ } else {
+ return offset / (float) size;
+ }
}
@Override
public void close() throws IOException {
+ resultSet.close();
}
@Override
protected boolean nextInner() throws IOException {
- return true;
+ if (!resultSetIterator.hasNext()) {
+ return false;
+ }
+
+ MapObject<String, Object> obj = resultSetIterator.next();
+ key = (K) obj.get("id");
+ persistent = ((RethinkDBStore<K, T>) getDataStore())
+ .convertRethinkDBDocToAvroBean(obj, getQuery().getFields());
+ return persistent != null;
}
@Override
public int size() {
- return 0;
+ int totalSize = size;
+ int intLimit = (int) this.limit;
+ return intLimit > 0 && totalSize > intLimit ? intLimit : totalSize;
}
}
diff --git a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java
index cb5d81a..9a50704 100644
--- a/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java
+++ b/gora-rethinkdb/src/main/java/org/apache/gora/rethinkdb/store/RethinkDBStore.java
@@ -24,11 +24,13 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Properties;
import java.util.List;
import java.util.ArrayList;
import com.rethinkdb.RethinkDB;
+import com.rethinkdb.gen.ast.ReqlExpr;
import com.rethinkdb.model.MapObject;
import com.rethinkdb.net.Connection;
import org.apache.avro.Schema;
@@ -42,6 +44,7 @@
import org.apache.gora.query.Query;
import org.apache.gora.query.Result;
import org.apache.gora.query.impl.PartitionQueryImpl;
+import org.apache.gora.rethinkdb.query.RethinkDBResult;
import org.apache.gora.store.impl.DataStoreBase;
import org.apache.gora.util.AvroUtils;
import org.apache.gora.util.ClassLoadingUtils;
@@ -176,7 +179,7 @@
try {
boolean isExists = r.db(rethinkDBStoreParameters.getDatabaseName())
.table(rethinkDBMapping.getDocumentClass())
- .get(key)
+ .getAll(key)
.count()
.run(connection, Boolean.class)
.first();
@@ -256,7 +259,43 @@
*/
@Override
public long deleteByQuery(Query<K, T> query) throws GoraException {
- return 0L;
+ if (query.getFields() == null || (query.getFields().length == getFields().length)) {
+ String[] fields = getFieldsToQuery(query.getFields());
+ RethinkDBQuery dataStoreQuery = ((RethinkDBQuery) query);
+ dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters, fields, getFields());
+ ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+ MapObject<String, Object> document = reqlExpr.delete().run(connection, MapObject.class).first();
+ int deleteCount = Integer.valueOf(document.get("deleted").toString());
+ if (deleteCount > 0) {
+ return deleteCount;
+ } else {
+ return 0;
+ }
+ } else {
+ RethinkDBQuery<K, T> dataStoreQuery = new RethinkDBQuery<>(this);
+ dataStoreQuery.setStartKey(query.getStartKey());
+ dataStoreQuery.setEndKey(query.getEndKey());
+ dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters,
+ getFieldsToQuery(null), getFields());
+ ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+ String[] projection = new String[query.getFields().length];
+ int counter = 0;
+ for (String k : query.getFields()) {
+ String dbFieldName = rethinkDBMapping.getDocumentField(k);
+ if (dbFieldName != null && dbFieldName.length() > 0) {
+ projection[counter] = dbFieldName;
+ counter++;
+ }
+ }
+ MapObject<String, Object> document = reqlExpr.replace(row -> row.without(projection))
+ .run(connection, MapObject.class).first();
+ int replacedCount = Integer.valueOf(document.get("replaced").toString());
+ if (replacedCount > 0) {
+ return replacedCount;
+ } else {
+ return 0;
+ }
+ }
}
/**
@@ -264,8 +303,18 @@
*/
@Override
public Result<K, T> execute(Query<K, T> query) throws GoraException {
+ String[] fields = getFieldsToQuery(query.getFields());
+ RethinkDBQuery dataStoreQuery;
+ if (query instanceof RethinkDBQuery) {
+ dataStoreQuery = ((RethinkDBQuery) query);
+ } else {
+ dataStoreQuery = (RethinkDBQuery) ((PartitionQueryImpl<K, T>) query).getBaseQuery();
+ }
+ dataStoreQuery.populateRethinkDBQuery(rethinkDBMapping, rethinkDBStoreParameters, fields, getFields());
try {
- return null;
+ ReqlExpr reqlExpr = dataStoreQuery.getRethinkDBDbQuery();
+ com.rethinkdb.net.Result<MapObject> result = reqlExpr.run(connection, MapObject.class);
+ return new RethinkDBResult<>(this, query, result);
} catch (Exception e) {
throw new GoraException(e);
}
@@ -542,7 +591,8 @@
result = convertDocFieldToAvroList(docf, fieldSchema, obj, field, storeType);
break;
case RECORD:
- MapObject<String, Object> record = (MapObject<String, Object>) obj.get(docf);
+ MapObject<String, Object> record = (MapObject<String, Object>)
+ decorateMapToODoc((Map<String, Object>) obj.get(docf));
if (record == null) {
result = null;
break;
@@ -655,7 +705,8 @@
final Schema.Field f,
final RethinkDBMapping.DocumentFieldType storeType) throws GoraException {
- if (storeType == RethinkDBMapping.DocumentFieldType.LIST) {
+ if (storeType == RethinkDBMapping.DocumentFieldType.LIST
+ || storeType == null) {
List<Object> list = (List<Object>) doc.get(docf);
List<Object> rlist = new ArrayList<>();
if (list == null) {
@@ -692,7 +743,8 @@
}
return new DirtyMapWrapper<>(rmap);
} else {
- MapObject<String, Object> innerDoc = (MapObject<String, Object>) doc.get(docf);
+ MapObject<String, Object> innerDoc = (MapObject<String, Object>)
+ decorateMapToODoc((Map<String, Object>) doc.get(docf));
Map<Utf8, Object> rmap = new HashMap<>();
if (innerDoc == null) {
return new DirtyMapWrapper(rmap);
@@ -709,6 +761,9 @@
}
private MapObject<String, Object> decorateMapToODoc(Map<String, Object> map) {
+ if (Objects.isNull(map)) {
+ return null;
+ }
MapObject<String, Object> doc = new MapObject();
for (Map.Entry entry : map.entrySet()) {
doc.put(entry.getKey().toString(), entry.getValue());
diff --git a/gora-rethinkdb/src/test/java/org/apache/gora/rethinkdb/store/RethinkDBGoraDataStoreTest.java b/gora-rethinkdb/src/test/java/org/apache/gora/rethinkdb/store/RethinkDBGoraDataStoreTest.java
index aecfdd3..0270da2 100644
--- a/gora-rethinkdb/src/test/java/org/apache/gora/rethinkdb/store/RethinkDBGoraDataStoreTest.java
+++ b/gora-rethinkdb/src/test/java/org/apache/gora/rethinkdb/store/RethinkDBGoraDataStoreTest.java
@@ -18,17 +18,14 @@
package org.apache.gora.rethinkdb.store;
-
import org.apache.gora.rethinkdb.RethinkDBTestDriver;
import org.apache.gora.store.DataStoreTestBase;
-import org.apache.gora.store.DataStoreTestUtil;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
-import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.FixedHostPortGenericContainer;
@@ -72,13 +69,6 @@
super.tearDown();
}
- @Override
- @Test
- public void testGet() throws Exception {
- log.info("test method: testGet");
- DataStoreTestUtil.testGetEmployee(this.employeeStore);
- }
-
@Ignore("3 types union field is not supported by OrientDBStore.")
@Override
public void testGet3UnionField() throws Exception {