Updates for Mongo aggregation
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
index 6996fcd..fda8538 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
@@ -95,6 +95,8 @@
* false.
*/
public class AggregationPipelineQueryNode extends ExternalSet {
+ private static final long serialVersionUID = 1L;
+
/**
* An aggregation result corresponding to a solution should map this key
* to an object which itself maps variable names to variable values.
@@ -230,8 +232,12 @@
}
final List<Bson> fields = new LinkedList<>();
fields.add(Projections.excludeId());
- fields.add(Projections.computed(VALUES, values));
- fields.add(Projections.computed(HASHES, hashes));
+ if (!values.isEmpty()) {
+ fields.add(Projections.computed(VALUES, values));
+ }
+ if (!hashes.isEmpty()) {
+ fields.add(Projections.computed(HASHES, hashes));
+ }
if (!types.isEmpty()) {
fields.add(Projections.computed(TYPES, types));
}
@@ -778,8 +784,7 @@
*/
public void requireSourceDerivationDepth(final int requiredLevel) {
if (requiredLevel > 0) {
- pipeline.add(Aggregates.match(new Document(LEVEL,
- new Document("$gte", requiredLevel))));
+ pipeline.add(Aggregates.match(Filters.gte(LEVEL, requiredLevel)));
}
}
@@ -794,8 +799,7 @@
* timestamp than this.
*/
public void requireSourceTimestamp(final long t) {
- pipeline.add(Aggregates.match(new Document(TIMESTAMP,
- new Document("$gte", t))));
+ pipeline.add(Aggregates.match(Filters.gte(TIMESTAMP, t)));
}
/**
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
index 462da1c..179b3d4 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/iter/RyaStatementBindingSetCursorIterator.java
@@ -34,16 +34,19 @@
import org.apache.rya.mongodb.dao.MongoDBStorageStrategy;
import org.apache.rya.mongodb.document.operators.aggregation.AggregationUtil;
import org.bson.Document;
+import org.bson.conversions.Bson;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.query.BindingSet;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Multimap;
+import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MongoCollection;
-import com.mongodb.util.JSON;
+import com.mongodb.client.model.Aggregates;
+import com.mongodb.client.model.Filters;
public class RyaStatementBindingSetCursorIterator implements CloseableIteration<Entry<RyaStatement, BindingSet>, RyaDAOException> {
private static final Logger log = Logger.getLogger(RyaStatementBindingSetCursorIterator.class);
@@ -102,7 +105,7 @@
if (currentBatchQueryResultCursorIsValid()) {
// convert to Rya Statement
final Document queryResult = batchQueryResultsIterator.next();
- final DBObject dbo = (DBObject) JSON.parse(queryResult.toJson());
+ final DBObject dbo = BasicDBObject.parse(queryResult.toJson());
currentResultStatement = strategy.deserializeDBObject(dbo);
// Find all of the queries in the executed RangeMap that this result matches
@@ -136,21 +139,23 @@
private void submitBatchQuery() {
int count = 0;
executedRangeMap.clear();
- final List<Document> pipeline = new ArrayList<>();
- final List<DBObject> match = new ArrayList<>();
+ final List<Bson> pipeline = new ArrayList<>();
+ final List<Bson> matches = new ArrayList<>();
while (queryIterator.hasNext() && count < QUERY_BATCH_SIZE){
count++;
final RyaStatement query = queryIterator.next();
executedRangeMap.putAll(query, rangeMap.get(query));
final DBObject currentQuery = strategy.getQuery(query);
- match.add(currentQuery);
+ final Document doc = Document.parse(currentQuery.toString());
+ matches.add(doc);
}
- if (match.size() > 1) {
- pipeline.add(new Document("$match", new Document("$or", match)));
- } else if (match.size() == 1) {
- pipeline.add(new Document("$match", match.get(0)));
+ final int numMatches = matches.size();
+ if (numMatches > 1) {
+ pipeline.add(Aggregates.match(Filters.or(matches)));
+ } else if (numMatches == 1) {
+ pipeline.add(Aggregates.match(matches.get(0)));
} else {
batchQueryResultsIterator = Iterators.emptyIterator();
return;
diff --git a/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java b/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java
index a5f8b3a..439d06a 100644
--- a/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java
+++ b/extras/indexing/src/main/java/org/apache/rya/indexing/statement/metadata/matching/StatementMetadataNode.java
@@ -8,9 +8,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -37,9 +37,9 @@
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.RdfCloudTripleStoreUtils;
+import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaType;
-import org.apache.rya.api.domain.RyaIRI;
import org.apache.rya.api.domain.StatementMetadata;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.persist.query.RyaQueryEngine;
@@ -81,7 +81,7 @@
* expensive from a storage perspective. It is also expensive from a query
* perspective in that three joins are required to evaluate a query that is
* reduced to a single scan in non-reified form.
- *
+ *
* This class provides Rya with the ability to issue reified queries even though
* statements are not reified. Each {@link RyaStatement} contains a
* {@link StatementMetadata} field that allows users to store additional
@@ -112,18 +112,18 @@
private StatementPattern statement;
private Map<RyaIRI, Var> properties;
- private Collection<StatementPattern> patterns;
- private List<RyaIRI> uriList = Arrays.asList(TYPE_ID_URI, SUBJ_ID_URI, PRED_ID_URI, OBJ_ID_URI);
- private C conf;
+ private final Collection<StatementPattern> patterns;
+ private final List<RyaIRI> uriList = Arrays.asList(TYPE_ID_URI, SUBJ_ID_URI, PRED_ID_URI, OBJ_ID_URI);
+ private final C conf;
private Set<String> bindingNames;
private RyaQueryEngine<C> queryEngine;
- public StatementMetadataNode(final Collection<StatementPattern> patterns, C conf) {
+ public StatementMetadataNode(final Collection<StatementPattern> patterns, final C conf) {
this.conf = conf;
this.patterns = patterns;
verifySameSubjects(patterns);
verifyAllPredicatesAreConstants(patterns);
- boolean correctForm = verifyHasCorrectTypePattern(patterns);
+ final boolean correctForm = verifyHasCorrectTypePattern(patterns);
if (!correctForm) {
throw new IllegalArgumentException("Invalid reified StatementPatterns.");
}
@@ -132,7 +132,7 @@
/**
* Get {@link StatementPattern}s representing the underlying reified query.
- *
+ *
* @return Collection of StatementPatterns
*/
public Collection<StatementPattern> getReifiedStatementPatterns() {
@@ -148,7 +148,7 @@
* @throws IllegalStateException
* If all of the Subjects are not the same.
*/
- private static void verifySameSubjects(Collection<StatementPattern> patterns) throws IllegalStateException {
+ private static void verifySameSubjects(final Collection<StatementPattern> patterns) throws IllegalStateException {
requireNonNull(patterns);
final Iterator<StatementPattern> it = patterns.iterator();
@@ -204,7 +204,7 @@
boolean valid = true;
boolean contextSet = false;
Var context = null;
-
+
for (final StatementPattern pattern : patterns) {
final RyaIRI predicate = new RyaIRI(pattern.getPredicateVar().getValue().toString());
@@ -216,15 +216,20 @@
return false;
}
}
-
+
if (predicate.equals(TYPE_ID_URI)) {
- final RyaIRI statementID = new RyaIRI(pattern.getObjectVar().getValue().stringValue());
- if (statementID.equals(STATEMENT_ID_URI)) {
- statementFound = true;
+ final Value objectValue = pattern.getObjectVar().getValue();
+ if (objectValue != null) {
+ final RyaIRI statementID = new RyaIRI(objectValue.stringValue());
+ if (statementID.equals(STATEMENT_ID_URI)) {
+ statementFound = true;
+ } else {
+ // contains more than one Statement containing TYPE_ID_URI
+ // as Predicate
+ // and STATEMENT_ID_URI as Object
+ valid = false;
+ }
} else {
- // contains more than one Statement containing TYPE_ID_URI
- // as Predicate
- // and STATEMENT_ID_URI as Object
valid = false;
}
}
@@ -274,20 +279,20 @@
* the user specified metadata properties and is used for comparison with
* the metadata properties extracted from RyaStatements passed back by the
* {@link RyaQueryEngine}.
- *
+ *
* @param patterns
* - collection of patterns representing a reified query
*/
- private void setStatementPatternAndProperties(Collection<StatementPattern> patterns) {
+ private void setStatementPatternAndProperties(final Collection<StatementPattern> patterns) {
- StatementPattern sp = new StatementPattern();
- Map<RyaIRI, Var> properties = new HashMap<>();
+ final StatementPattern sp = new StatementPattern();
+ final Map<RyaIRI, Var> properties = new HashMap<>();
for (final StatementPattern pattern : patterns) {
final RyaIRI predicate = new RyaIRI(pattern.getPredicateVar().getValue().toString());
if (!uriList.contains(predicate)) {
- Var objVar = pattern.getObjectVar();
+ final Var objVar = pattern.getObjectVar();
properties.put(predicate, objVar);
continue;
}
@@ -315,17 +320,17 @@
* {@link RyaQueryEngine}.
*/
@Override
- public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(Collection<BindingSet> bindingset)
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final Collection<BindingSet> bindingset)
throws QueryEvaluationException {
if (bindingset.size() == 0) {
return new EmptyIteration<>();
}
queryEngine = RyaQueryEngineFactory.getQueryEngine(conf);
- Set<Map.Entry<RyaStatement, BindingSet>> statements = new HashSet<>();
- Iterator<BindingSet> iter = bindingset.iterator();
+ final Set<Map.Entry<RyaStatement, BindingSet>> statements = new HashSet<>();
+ final Iterator<BindingSet> iter = bindingset.iterator();
while (iter.hasNext()) {
- BindingSet bs = iter.next();
+ final BindingSet bs = iter.next();
statements.add(new RdfCloudTripleStoreUtils.CustomEntry<RyaStatement, BindingSet>(
getRyaStatementFromBindings(bs), bs));
}
@@ -333,7 +338,7 @@
final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> iteration;
try {
iteration = queryEngine.queryWithBindingSet(statements, conf);
- } catch (RyaDAOException e) {
+ } catch (final RyaDAOException e) {
throw new RuntimeException(e);
}
@@ -344,17 +349,17 @@
* Uses StatementPattern constraints to form a RyaStatement, and fills in
* any null values with {@link BindingSet} values corresponding to the
* variable for that position.
- *
+ *
* @param bs
* @return RyaStatement whose values are determined by StatementPattern and
* BindingSet constraints
*/
- private RyaStatement getRyaStatementFromBindings(BindingSet bs) {
+ private RyaStatement getRyaStatementFromBindings(final BindingSet bs) {
- Value subjValue = getVarValue(statement.getSubjectVar(), bs);
- Value predValue = getVarValue(statement.getPredicateVar(), bs);
- Value objValue = getVarValue(statement.getObjectVar(), bs);
- Value contextValue = getVarValue(statement.getContextVar(), bs);
+ final Value subjValue = getVarValue(statement.getSubjectVar(), bs);
+ final Value predValue = getVarValue(statement.getPredicateVar(), bs);
+ final Value objValue = getVarValue(statement.getObjectVar(), bs);
+ final Value contextValue = getVarValue(statement.getContextVar(), bs);
RyaIRI subj = null;
RyaIRI pred = null;
RyaType obj = null;
@@ -373,7 +378,7 @@
if (objValue != null) {
obj = RdfToRyaConversions.convertValue(objValue);
}
-
+
if(contextValue != null) {
context = RdfToRyaConversions.convertIRI((IRI) contextValue);
}
@@ -386,12 +391,12 @@
* otherwise returns the BindingSet Value corresponding to
* {@link Var#getName()}. If no such Binding exits, this method returns
* null.
- *
+ *
* @param var
* @param bindings
* @return Value
*/
- private Value getVarValue(Var var, BindingSet bindings) {
+ private Value getVarValue(final Var var, final BindingSet bindings) {
if (var == null) {
return null;
} else if (var.hasValue()) {
@@ -402,20 +407,20 @@
}
@Override
- public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(BindingSet bindings)
+ public CloseableIteration<BindingSet, QueryEvaluationException> evaluate(final BindingSet bindings)
throws QueryEvaluationException {
return evaluate(Collections.singleton(bindings));
}
@Override
- public boolean equals(Object other) {
+ public boolean equals(final Object other) {
if (this == other) {
return true;
}
if (other instanceof StatementMetadataNode) {
- StatementMetadataNode<?> meta = (StatementMetadataNode<?>) other;
+ final StatementMetadataNode<?> meta = (StatementMetadataNode<?>) other;
if (meta.patterns.size() != this.patterns.size()) {
return false;
}
@@ -424,8 +429,8 @@
return false;
}
- Set<StatementPattern> thisSet = new HashSet<>(patterns);
- Set<StatementPattern> thatSet = new HashSet<>(meta.patterns);
+ final Set<StatementPattern> thisSet = new HashSet<>(patterns);
+ final Set<StatementPattern> thatSet = new HashSet<>(meta.patterns);
return thisSet.equals(thatSet);
} else {
return false;
@@ -435,7 +440,7 @@
@Override
public int hashCode() {
int hashcode = 0;
- for (StatementPattern sp : patterns) {
+ for (final StatementPattern sp : patterns) {
hashcode += sp.hashCode();
}
return hashcode;
@@ -465,9 +470,9 @@
}
private Set<String> getVariableNames() {
- Set<String> vars = new HashSet<>();
- for (StatementPattern pattern : patterns) {
- for (Var var : pattern.getVarList()) {
+ final Set<String> vars = new HashSet<>();
+ for (final StatementPattern pattern : patterns) {
+ for (final Var var : pattern.getVarList()) {
if (var.getValue() == null) {
vars.add(var.getName());
}
@@ -495,16 +500,16 @@
*/
class PropertyFilterAndBindingSetJoinIteration implements CloseableIteration<BindingSet, QueryEvaluationException> {
- private CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements;
- private Map<RyaIRI, Var> properties;
- private StatementPattern sp;
+ private final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements;
+ private final Map<RyaIRI, Var> properties;
+ private final StatementPattern sp;
private BindingSet next;
private boolean hasNextCalled = false;
private boolean hasNext = false;
public PropertyFilterAndBindingSetJoinIteration(
- CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements,
- Map<RyaIRI, Var> properties, StatementPattern sp) {
+ final CloseableIteration<? extends Entry<RyaStatement, BindingSet>, RyaDAOException> statements,
+ final Map<RyaIRI, Var> properties, final StatementPattern sp) {
this.statements = statements;
this.properties = properties;
this.sp = sp;
@@ -562,7 +567,7 @@
public void close() throws QueryEvaluationException {
try {
statements.close();
- } catch (RyaDAOException e) {
+ } catch (final RyaDAOException e) {
throw new QueryEvaluationException(e);
}
}
@@ -570,14 +575,14 @@
/**
* Fast-forwards Iteration to next valid Entry and builds the
* BindingSet.
- *
+ *
* @return BindingSet
* @throws RyaDAOException
*/
private Optional<BindingSet> getNext() throws RyaDAOException {
Optional<BindingSet> optionalBs = Optional.empty();
while (statements.hasNext() && !optionalBs.isPresent()) {
- Map.Entry<RyaStatement, BindingSet> next = statements.next();
+ final Map.Entry<RyaStatement, BindingSet> next = statements.next();
optionalBs = buildBindingSet(next.getKey(), next.getValue());
}
return optionalBs;
@@ -590,7 +595,7 @@
* {@link StatementMetadata} properties for the specified RyaStatement
* and if the BindingSet built form the StatementMetadata properties can
* be joined with specified BindingSet.
- *
+ *
* @param statement
* - RyaStatement
* @param bindingSet
@@ -598,15 +603,15 @@
* @return - Optional containing BindingSet is a valid BindingSet could
* be built
*/
- private Optional<BindingSet> buildBindingSet(RyaStatement statement, BindingSet bindingSet) {
+ private Optional<BindingSet> buildBindingSet(final RyaStatement statement, final BindingSet bindingSet) {
- QueryBindingSet bs = new QueryBindingSet();
- Optional<BindingSet> optPropBs = buildPropertyBindingSet(statement);
+ final QueryBindingSet bs = new QueryBindingSet();
+ final Optional<BindingSet> optPropBs = buildPropertyBindingSet(statement);
if (!optPropBs.isPresent()) {
return Optional.empty();
}
- BindingSet propBs = optPropBs.get();
- BindingSet spBs = buildBindingSetFromStatementPattern(statement);
+ final BindingSet propBs = optPropBs.get();
+ final BindingSet spBs = buildBindingSetFromStatementPattern(statement);
if (!canJoinBindingSets(spBs, propBs)) {
return Optional.empty();
}
@@ -625,24 +630,24 @@
* StatementMetadata properties for specified RyaStatement. If
* consistent, this method builds the associated BindingSet otherwise an
* empty Optional is returned.
- *
+ *
* @param statement
* @return
*/
- private Optional<BindingSet> buildPropertyBindingSet(RyaStatement statement) {
- StatementMetadata metadata = statement.getMetadata();
- Map<RyaIRI, RyaType> statementProps = metadata.getMetadata();
+ private Optional<BindingSet> buildPropertyBindingSet(final RyaStatement statement) {
+ final StatementMetadata metadata = statement.getMetadata();
+ final Map<RyaIRI, RyaType> statementProps = metadata.getMetadata();
if (statementProps.size() < properties.size()) {
return Optional.empty();
}
- QueryBindingSet bs = new QueryBindingSet();
- for (Map.Entry<RyaIRI, Var> entry : properties.entrySet()) {
- RyaIRI key = entry.getKey();
- Var var = entry.getValue();
+ final QueryBindingSet bs = new QueryBindingSet();
+ for (final Map.Entry<RyaIRI, Var> entry : properties.entrySet()) {
+ final RyaIRI key = entry.getKey();
+ final Var var = entry.getValue();
if (!statementProps.containsKey(key)) {
return Optional.empty();
} else {
- Value val = RyaToRdfConversions.convertValue(statementProps.get(key));
+ final Value val = RyaToRdfConversions.convertValue(statementProps.get(key));
if (var.getValue() == null) {
bs.addBinding(var.getName(), val);
} else if (!var.getValue().equals(val)) {
@@ -661,16 +666,16 @@
* If it doesn't have a Value, a Binding is created from the
* RyaStatement using the {@link RyaType} for the corresponding position
* (Subject, Predicate, Object).
- *
+ *
* @param statement
* @return BindingSet
*/
- private BindingSet buildBindingSetFromStatementPattern(RyaStatement statement) {
- Var subjVar = sp.getSubjectVar();
- Var predVar = sp.getPredicateVar();
- Var objVar = sp.getObjectVar();
- Var contextVar = sp.getContextVar();
- QueryBindingSet bs = new QueryBindingSet();
+ private BindingSet buildBindingSetFromStatementPattern(final RyaStatement statement) {
+ final Var subjVar = sp.getSubjectVar();
+ final Var predVar = sp.getPredicateVar();
+ final Var objVar = sp.getObjectVar();
+ final Var contextVar = sp.getContextVar();
+ final QueryBindingSet bs = new QueryBindingSet();
if (subjVar.getValue() == null) {
bs.addBinding(subjVar.getName(), RyaToRdfConversions.convertValue(statement.getSubject()));
@@ -683,7 +688,7 @@
if (objVar.getValue() == null) {
bs.addBinding(objVar.getName(), RyaToRdfConversions.convertValue(statement.getObject()));
}
-
+
if (contextVar != null && contextVar.getValue() == null) {
bs.addBinding(contextVar.getName(), RyaToRdfConversions.convertValue(statement.getContext()));
}
@@ -691,10 +696,10 @@
return bs;
}
- private boolean canJoinBindingSets(BindingSet bs1, BindingSet bs2) {
- for (Binding b : bs1) {
- String name = b.getName();
- Value val = b.getValue();
+ private boolean canJoinBindingSets(final BindingSet bs1, final BindingSet bs2) {
+ for (final Binding b : bs1) {
+ final String name = b.getName();
+ final Value val = b.getValue();
if (bs2.hasBinding(name) && (!bs2.getValue(name).equals(val))) {
return false;
}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java
index 95ad841..4e80428 100644
--- a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java
@@ -50,12 +50,11 @@
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import com.google.common.base.Preconditions;
-import com.mongodb.Block;
+import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
-import com.mongodb.util.JSON;
/**
* A rule execution strategy for MongoDB Rya that converts a single rule into an
@@ -83,7 +82,7 @@
* passed a stateful configuration, uses the existing mongo client,
* otherwise creates one.
*/
- public MongoPipelineStrategy(MongoDBRdfConfiguration mongoConf) throws ForwardChainException {
+ public MongoPipelineStrategy(final MongoDBRdfConfiguration mongoConf) throws ForwardChainException {
Preconditions.checkNotNull(mongoConf);
final String mongoDBName = mongoConf.getMongoDBName();
final String collectionName = mongoConf.getTriplesCollectionName();
@@ -100,7 +99,7 @@
this.dao = RyaSailFactory.getMongoDAO(mongoConf);
statefulConf = this.dao.getConf();
}
- } catch (RyaDAOException e) {
+ } catch (final RyaDAOException e) {
throw new ForwardChainException("Can't connect to Rya.", e);
}
final MongoClient mongoClient = statefulConf.getMongoClient();
@@ -130,11 +129,11 @@
* @throws ForwardChainException if execution fails.
*/
@Override
- public long executeConstructRule(AbstractConstructRule rule,
- StatementMetadata metadata) throws ForwardChainException {
+ public long executeConstructRule(final AbstractConstructRule rule,
+ final StatementMetadata metadata) throws ForwardChainException {
Preconditions.checkNotNull(rule);
logger.info("Applying inference rule " + rule + "...");
- long timestamp = System.currentTimeMillis();
+ final long timestamp = System.currentTimeMillis();
// Get a pipeline that turns individual matches into triples
List<Bson> pipeline = null;
try {
@@ -149,20 +148,20 @@
}
pipeline = toPipeline(rule, requireSourceLevel, timestamp);
}
- catch (ForwardChainException e) {
+ catch (final ForwardChainException e) {
logger.error(e);
}
if (pipeline == null) {
if (backup == null) {
logger.error("Couldn't convert " + rule + " to pipeline:");
- for (String line : rule.getQuery().toString().split("\n")) {
+ for (final String line : rule.getQuery().toString().split("\n")) {
logger.error("\t" + line);
}
throw new UnsupportedOperationException("Couldn't convert query to pipeline.");
}
else {
logger.debug("Couldn't convert " + rule + " to pipeline:");
- for (String line : rule.getQuery().toString().split("\n")) {
+ for (final String line : rule.getQuery().toString().split("\n")) {
logger.debug("\t" + line);
}
logger.debug("Using fallback strategy.");
@@ -171,32 +170,30 @@
}
}
// Execute the pipeline
- for (Bson step : pipeline) {
+ for (final Bson step : pipeline) {
logger.debug("\t" + step.toString());
}
- LongAdder count = new LongAdder();
+ final LongAdder count = new LongAdder();
baseCollection.aggregate(pipeline)
.allowDiskUse(true)
.batchSize(PIPELINE_BATCH_SIZE)
- .forEach(new Block<Document>() {
- @Override
- public void apply(Document doc) {
- final DBObject dbo = (DBObject) JSON.parse(doc.toJson());
- RyaStatement rstmt = storageStrategy.deserializeDBObject(dbo);
- if (!statementExists(rstmt)) {
- count.increment();
- doc.replace(SimpleMongoDBStorageStrategy.STATEMENT_METADATA, metadata.toString());
- try {
- batchWriter.addObjectToQueue(doc);
- } catch (MongoDbBatchWriterException e) {
- logger.error("Couldn't insert " + rstmt, e);
- }
+ .forEach((final Document doc) -> {
+ final DBObject dbo = BasicDBObject.parse(doc.toJson());
+ final RyaStatement rstmt = storageStrategy.deserializeDBObject(dbo);
+ if (!statementExists(rstmt)) {
+ count.increment();
+ doc.replace(SimpleMongoDBStorageStrategy.STATEMENT_METADATA, metadata.toString());
+ try {
+ batchWriter.addObjectToQueue(doc);
+ } catch (final MongoDbBatchWriterException e) {
+ logger.error("Couldn't insert " + rstmt, e);
}
}
});
+
try {
batchWriter.flush();
- } catch (MongoDbBatchWriterException e) {
+ } catch (final MongoDbBatchWriterException e) {
throw new ForwardChainException("Error writing to Mongo", e);
}
logger.info("Added " + count + " new statements.");
@@ -211,10 +208,10 @@
return count.longValue();
}
- private boolean statementExists(RyaStatement rstmt) {
+ private boolean statementExists(final RyaStatement rstmt) {
try {
return engine.query(new RyaQuery(rstmt)).iterator().hasNext();
- } catch (RyaDAOException e) {
+ } catch (final RyaDAOException e) {
logger.error("Error querying for " + rstmt, e);
return false;
}
@@ -231,7 +228,7 @@
backup.shutDown();
try {
batchWriter.shutdown();
- } catch (MongoDbBatchWriterException e) {
+ } catch (final MongoDbBatchWriterException e) {
throw new ForwardChainException("Error shutting down batch writer", e);
}
}
@@ -247,24 +244,24 @@
* @return An aggregation pipeline.
* @throws ForwardChainException if pipeline construction fails.
*/
- private List<Bson> toPipeline(AbstractConstructRule rule, int sourceLevel,
- long timestamp) throws ForwardChainException {
+ private List<Bson> toPipeline(final AbstractConstructRule rule, final int sourceLevel,
+ final long timestamp) throws ForwardChainException {
TupleExpr tupleExpr = rule.getQuery().getTupleExpr();
if (!(tupleExpr instanceof QueryRoot)) {
tupleExpr = new QueryRoot(tupleExpr);
}
try {
tupleExpr.visit(pipelineVisitor);
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new ForwardChainException("Error converting construct rule to an aggregation pipeline", e);
}
if (tupleExpr instanceof QueryRoot) {
- QueryRoot root = (QueryRoot) tupleExpr;
+ final QueryRoot root = (QueryRoot) tupleExpr;
if (root.getArg() instanceof AggregationPipelineQueryNode) {
- AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) root.getArg();
+ final AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) root.getArg();
pipelineNode.distinct(); // require distinct triples
pipelineNode.requireSourceDerivationDepth(sourceLevel);
- long latestTime = executionTimes.getOrDefault(rule, 0L);
+ final long latestTime = executionTimes.getOrDefault(rule, 0L);
if (latestTime > 0) {
pipelineNode.requireSourceTimestamp(latestTime);
}
diff --git a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java
index 84080e5..efa13dc 100644
--- a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java
+++ b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java
@@ -18,10 +18,13 @@
*/
package org.apache.rya.forwardchain.batch;
+import static org.junit.Assert.assertEquals;
+
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URL;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -37,10 +40,11 @@
import org.apache.rya.test.mongo.EmbeddedMongoFactory;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
+import org.eclipse.rdf4j.query.AbstractTupleQueryResultHandler;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryLanguage;
import org.eclipse.rdf4j.query.TupleQuery;
-import org.eclipse.rdf4j.query.TupleQueryResult;
+import org.eclipse.rdf4j.query.TupleQueryResultHandlerException;
import org.eclipse.rdf4j.query.impl.ListBindingSet;
import org.eclipse.rdf4j.repository.RepositoryException;
import org.eclipse.rdf4j.repository.sail.SailRepository;
@@ -48,11 +52,10 @@
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.Rio;
import org.junit.After;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableSet;
import com.google.common.io.Resources;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
@@ -84,48 +87,52 @@
}
@Test
+ public void testNoStrategy() throws Exception {
+ loadDataFiles();
+ final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
+ final Set<BindingSet> expected = new HashSet<>();
+ assertEquals(expected, solutions);
+ }
+
+ @Test
public void testSailStrategy() throws Exception {
- insertDataFile(Resources.getResource("data.ttl"), "http://example.org#");
- insertDataFile(Resources.getResource("university.ttl"), "http://example.org#");
- insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#");
- Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
- Set<BindingSet> expected = new HashSet<>();
- Assert.assertEquals(expected, solutions);
+ loadDataFiles();
conf.setUseAggregationPipeline(false);
- ForwardChainSpinTool tool = new ForwardChainSpinTool();
+ final ForwardChainSpinTool tool = new ForwardChainSpinTool();
ToolRunner.run(conf, tool, new String[] {});
- solutions = executeQuery(Resources.getResource("query.sparql"));
- expected.add(new ListBindingSet(Arrays.asList("X", "Y"),
+ final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
+ final Set<BindingSet> expected = ImmutableSet.of(new ListBindingSet(Arrays.asList("X", "Y"),
VF.createIRI(EX, "Alice"), VF.createIRI(EX, "Department1")));
- Assert.assertEquals(expected, solutions);
+ assertEquals(expected, solutions);
// TODO: Check if spin rules with empty WHERE clauses, such as
// rl:scm-cls in the owlrl.ttl test file, should be included.
- Assert.assertEquals(48, tool.getNumInferences());
+ assertEquals(48, tool.getNumInferences());
}
@Test
public void testPipelineStrategy() throws Exception {
+ loadDataFiles();
+ conf.setUseAggregationPipeline(true);
+ final ForwardChainSpinTool tool = new ForwardChainSpinTool();
+ ToolRunner.run(conf, tool, new String[] {});
+ final Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
+ final Set<BindingSet> expected = ImmutableSet.of(new ListBindingSet(Arrays.asList("X", "Y"),
+ VF.createIRI(EX, "Alice"), VF.createIRI(EX, "Department1")));
+ assertEquals(expected, solutions);
+ // TODO: Check if spin rules with empty WHERE clauses, such as
+ // rl:scm-cls in the owlrl.ttl test file, should be included.
+ assertEquals(41, tool.getNumInferences());
+ }
+
+ private void loadDataFiles() throws Exception {
insertDataFile(Resources.getResource("data.ttl"), "http://example.org#");
insertDataFile(Resources.getResource("university.ttl"), "http://example.org#");
insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#");
- Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
- Set<BindingSet> expected = new HashSet<>();
- Assert.assertEquals(expected, solutions);
- conf.setUseAggregationPipeline(true);
- ForwardChainSpinTool tool = new ForwardChainSpinTool();
- ToolRunner.run(conf, tool, new String[] {});
- solutions = executeQuery(Resources.getResource("query.sparql"));
- expected.add(new ListBindingSet(Arrays.asList("X", "Y"),
- VF.createIRI(EX, "Alice"), VF.createIRI(EX, "Department1")));
- Assert.assertEquals(expected, solutions);
- // TODO: Check if spin rules with empty WHERE clauses, such as
- // rl:scm-cls in the owlrl.ttl test file, should be included.
- Assert.assertEquals(41, tool.getNumInferences());
}
- private void insertDataFile(URL dataFile, String defaultNamespace) throws Exception {
- RDFFormat format = Rio.getParserFormatForFileName(dataFile.getFile()).get();
- SailRepositoryConnection conn = repository.getConnection();
+ private void insertDataFile(final URL dataFile, final String defaultNamespace) throws Exception {
+ final RDFFormat format = Rio.getParserFormatForFileName(dataFile.getFile()).get();
+ final SailRepositoryConnection conn = repository.getConnection();
try {
conn.add(dataFile, defaultNamespace, format);
} finally {
@@ -133,29 +140,29 @@
}
}
- private Set<BindingSet> executeQuery(URL queryFile) throws Exception {
- SailRepositoryConnection conn = repository.getConnection();
- try {
- try(
- final InputStream queryIS = queryFile.openStream();
- final BufferedReader br = new BufferedReader(new InputStreamReader(queryIS, Charsets.UTF_8));
- ) {
- final String query = br.lines().collect(Collectors.joining("\n"));
- final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, query);
- final TupleQueryResult result = tupleQuery.evaluate();
- final Set<BindingSet> solutions = new HashSet<>();
- while (result.hasNext()) {
- solutions.add(result.next());
+ private Set<BindingSet> executeQuery(final URL queryFile) throws Exception {
+ final SailRepositoryConnection conn = repository.getConnection();
+ try(
+ final InputStream queryIS = queryFile.openStream();
+ final BufferedReader br = new BufferedReader(new InputStreamReader(queryIS, StandardCharsets.UTF_8));
+ ) {
+ final String query = br.lines().collect(Collectors.joining("\n"));
+ final TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, query);
+ final Set<BindingSet> solutions = new HashSet<>();
+ tupleQuery.evaluate(new AbstractTupleQueryResultHandler() {
+ @Override
+ public void handleSolution(final BindingSet bindingSet) throws TupleQueryResultHandlerException {
+ solutions.add(bindingSet);
}
- return solutions;
- }
+ });
+ return solutions;
} finally {
closeQuietly(conn);
}
}
private static MongoDBRdfConfiguration getConf() throws Exception {
- MongoDBIndexingConfigBuilder builder = MongoIndexingConfiguration.builder().setUseMockMongo(true);
+ final MongoDBIndexingConfigBuilder builder = MongoIndexingConfiguration.builder().setUseMockMongo(true);
final MongoClient c = EmbeddedMongoFactory.newFactory().newMongoClient();
final ServerAddress address = c.getAddress();
builder.setMongoHost(address.getHost());