blob: 8dbf4b5bd191e02151b63e496ba73dfc578ab5f9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.mongodb.aggregation;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.domain.RyaStatement.RyaStatementBuilder;
import org.apache.rya.api.persist.RyaDAOException;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.api.resolver.RyaToRdfConversions;
import org.apache.rya.mongodb.MongoDBRyaDAO;
import org.apache.rya.mongodb.MongoITBase;
import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Literal;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.FOAF;
import org.eclipse.rdf4j.model.vocabulary.OWL;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.QueryEvaluationException;
import org.eclipse.rdf4j.query.algebra.Projection;
import org.eclipse.rdf4j.query.algebra.QueryRoot;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.impl.EmptyBindingSet;
import org.eclipse.rdf4j.query.impl.ListBindingSet;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.Multiset;
import com.mongodb.DBObject;
import com.mongodb.util.JSON;
public class PipelineQueryIT extends MongoITBase {
private static final ValueFactory VF = SimpleValueFactory.getInstance();
private static final SPARQLParser PARSER = new SPARQLParser();
private MongoDBRyaDAO dao;
@Before
@Override
public void setupTest() throws Exception {
super.setupTest();
dao = new MongoDBRyaDAO();
dao.setConf(conf);
dao.init();
}
private void insert(Resource subject, IRI predicate, Value object) throws RyaDAOException {
insert(subject, predicate, object, 0);
}
private void insert(Resource subject, IRI predicate, Value object, int derivationLevel) throws RyaDAOException {
final RyaStatementBuilder builder = new RyaStatementBuilder();
builder.setSubject(RdfToRyaConversions.convertResource(subject));
builder.setPredicate(RdfToRyaConversions.convertURI(predicate));
builder.setObject(RdfToRyaConversions.convertValue(object));
final RyaStatement rstmt = builder.build();
if (derivationLevel > 0) {
DBObject obj = new SimpleMongoDBStorageStrategy().serialize(builder.build());
obj.put("derivation_level", derivationLevel);
getRyaDbCollection().insert(obj);
}
else {
dao.add(rstmt);
}
}
private void testPipelineQuery(String query, Multiset<BindingSet> expectedSolutions) throws Exception {
// Prepare query and convert to pipeline
QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr());
SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection());
queryTree.visit(visitor);
// Execute pipeline and verify results
Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode);
AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg();
Multiset<BindingSet> solutions = HashMultiset.create();
CloseableIteration<BindingSet, QueryEvaluationException> iter = pipelineNode.evaluate(new QueryBindingSet());
while (iter.hasNext()) {
solutions.add(iter.next());
}
Assert.assertEquals(expectedSolutions, solutions);
}
@Test
public void testSingleStatementPattern() throws Exception {
// Insert data
insert(OWL.THING, RDF.TYPE, OWL.CLASS);
insert(FOAF.PERSON, RDF.TYPE, OWL.CLASS, 1);
insert(FOAF.PERSON, RDFS.SUBCLASSOF, OWL.THING);
insert(VF.createIRI("urn:Alice"), RDF.TYPE, FOAF.PERSON);
dao.flush();
// Define query and expected results
final String query = "SELECT * WHERE {\n"
+ " ?individual a ?type .\n"
+ "}";
List<String> varNames = Arrays.asList("individual", "type");
Multiset<BindingSet> expectedSolutions = HashMultiset.create();
expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, OWL.CLASS));
expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, OWL.CLASS));
expectedSolutions.add(new ListBindingSet(varNames, VF.createIRI("urn:Alice"), FOAF.PERSON));
// Execute pipeline and verify results
testPipelineQuery(query, expectedSolutions);
}
@Test
public void testNoVariableSP() throws Exception {
// Insert data
insert(OWL.THING, RDF.TYPE, OWL.CLASS);
insert(FOAF.PERSON, RDF.TYPE, OWL.CLASS, 1);
insert(FOAF.PERSON, RDFS.SUBCLASSOF, OWL.THING);
insert(VF.createIRI("urn:Alice"), RDF.TYPE, FOAF.PERSON);
dao.flush();
// Define query and expected results
final String query = "SELECT * WHERE {\n"
+ " owl:Thing a owl:Class .\n"
+ "}";
Multiset<BindingSet> expectedSolutions = HashMultiset.create();
expectedSolutions.add(new EmptyBindingSet());
// Execute pipeline and verify results
QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr());
SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection());
queryTree.visit(visitor);
Assert.assertTrue(queryTree.getArg() instanceof Projection);
Projection projection = (Projection) queryTree.getArg();
Assert.assertTrue(projection.getArg() instanceof AggregationPipelineQueryNode);
AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projection.getArg();
Multiset<BindingSet> solutions = HashMultiset.create();
CloseableIteration<BindingSet, QueryEvaluationException> iter = pipelineNode.evaluate(new QueryBindingSet());
while (iter.hasNext()) {
solutions.add(iter.next());
}
Assert.assertEquals(expectedSolutions, solutions);
}
@Test
public void testJoinTwoSharedVariables() throws Exception {
// Insert data
IRI person = VF.createIRI("urn:Person");
IRI livingThing = VF.createIRI("urn:LivingThing");
IRI human = VF.createIRI("urn:Human");
IRI programmer = VF.createIRI("urn:Programmer");
IRI thing = VF.createIRI("urn:Thing");
insert(programmer, RDFS.SUBCLASSOF, person);
insert(person, RDFS.SUBCLASSOF, FOAF.PERSON);
insert(FOAF.PERSON, RDFS.SUBCLASSOF, person);
insert(person, OWL.EQUIVALENTCLASS, human);
insert(person, RDFS.SUBCLASSOF, livingThing);
insert(livingThing, RDFS.SUBCLASSOF, thing);
insert(thing, RDFS.SUBCLASSOF, OWL.THING);
insert(OWL.THING, RDFS.SUBCLASSOF, thing);
dao.flush();
// Define query and expected results
final String query = "SELECT ?A ?B WHERE {\n"
+ " ?A rdfs:subClassOf ?B .\n"
+ " ?B rdfs:subClassOf ?A .\n"
+ "}";
List<String> varNames = Arrays.asList("A", "B");
Multiset<BindingSet> expectedSolutions = HashMultiset.create();
expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON));
expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person));
expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING));
expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing));
// Execute query and verify results
testPipelineQuery(query, expectedSolutions);
}
@Test
public void testVariableRename() throws Exception {
// Insert data
IRI alice = VF.createIRI("urn:Alice");
IRI bob = VF.createIRI("urn:Bob");
IRI carol = VF.createIRI("urn:Carol");
IRI dan = VF.createIRI("urn:Dan");
IRI eve = VF.createIRI("urn:Eve");
IRI friend = VF.createIRI("urn:friend");
insert(alice, friend, bob);
insert(alice, friend, carol);
insert(bob, friend, eve);
insert(carol, friend, eve);
insert(dan, friend, carol);
insert(eve, friend, alice);
// Define non-distinct query and expected results
final String query1 = "SELECT ?x (?z as ?friendOfFriend) WHERE {\n"
+ " ?x <urn:friend> ?y .\n"
+ " ?y <urn:friend> ?z .\n"
+ "}";
Multiset<BindingSet> expectedSolutions1 = HashMultiset.create();
List<String> varNames = Arrays.asList("x", "friendOfFriend");
expectedSolutions1.add(new ListBindingSet(varNames, alice, eve));
expectedSolutions1.add(new ListBindingSet(varNames, alice, eve));
expectedSolutions1.add(new ListBindingSet(varNames, bob, alice));
expectedSolutions1.add(new ListBindingSet(varNames, carol, alice));
expectedSolutions1.add(new ListBindingSet(varNames, dan, eve));
expectedSolutions1.add(new ListBindingSet(varNames, eve, bob));
expectedSolutions1.add(new ListBindingSet(varNames, eve, carol));
// Define distinct query and expected results
final String query2 = "SELECT DISTINCT ?x (?z as ?friendOfFriend) WHERE {\n"
+ " ?x <urn:friend> ?y .\n"
+ " ?y <urn:friend> ?z .\n"
+ "}";
Multiset<BindingSet> expectedSolutions2 = HashMultiset.create();
expectedSolutions2.add(new ListBindingSet(varNames, alice, eve));
expectedSolutions2.add(new ListBindingSet(varNames, bob, alice));
expectedSolutions2.add(new ListBindingSet(varNames, carol, alice));
expectedSolutions2.add(new ListBindingSet(varNames, dan, eve));
expectedSolutions2.add(new ListBindingSet(varNames, eve, bob));
expectedSolutions2.add(new ListBindingSet(varNames, eve, carol));
// Execute and verify results
testPipelineQuery(query1, expectedSolutions1);
testPipelineQuery(query2, expectedSolutions2);
}
@Test
public void testFilterQuery() throws Exception {
// Insert data
IRI alice = VF.createIRI("urn:Alice");
IRI bob = VF.createIRI("urn:Bob");
IRI eve = VF.createIRI("urn:Eve");
IRI relatedTo = VF.createIRI("urn:relatedTo");
insert(alice, FOAF.KNOWS, bob);
insert(alice, FOAF.KNOWS, alice);
insert(alice, FOAF.KNOWS, eve);
insert(alice, relatedTo, bob);
insert(bob, FOAF.KNOWS, eve);
insert(bob, relatedTo, bob);
dao.flush();
// Define query 1 and expected results
final String query1 = "SELECT * WHERE {\n"
+ " ?x <" + FOAF.KNOWS.stringValue() + "> ?y1 .\n"
+ " ?x <" + relatedTo.stringValue() + "> ?y2 .\n"
+ " FILTER (?y1 != ?y2) .\n"
+ "}";
final List<String> varNames = Arrays.asList("x", "y1", "y2");
final Multiset<BindingSet> expected1 = HashMultiset.create();
expected1.add(new ListBindingSet(varNames, alice, alice, bob));
expected1.add(new ListBindingSet(varNames, alice, eve, bob));
expected1.add(new ListBindingSet(varNames, bob, eve, bob));
// Define query 2 and expected results
final String query2 = "SELECT * WHERE {\n"
+ " ?x <" + FOAF.KNOWS.stringValue() + "> ?y1 .\n"
+ " ?x <" + relatedTo.stringValue() + "> ?y2 .\n"
+ " FILTER (?y1 = ?y2) .\n"
+ "}";
final Multiset<BindingSet> expected2 = HashMultiset.create();
expected2.add(new ListBindingSet(varNames, alice, bob, bob));
// Execute and verify results
testPipelineQuery(query1, expected1);
testPipelineQuery(query2, expected2);
}
@Test
public void testMultiConstruct() throws Exception {
// Insert data
IRI alice = VF.createIRI("urn:Alice");
IRI bob = VF.createIRI("urn:Bob");
IRI eve = VF.createIRI("urn:Eve");
IRI friend = VF.createIRI("urn:friend");
IRI knows = VF.createIRI("urn:knows");
IRI person = VF.createIRI("urn:Person");
insert(alice, friend, bob);
insert(bob, knows, eve);
insert(eve, knows, alice);
// Define query and expected results
final String query = "CONSTRUCT {\n"
+ " ?x rdf:type owl:Thing .\n"
+ " ?x rdf:type <urn:Person> .\n"
+ "} WHERE { ?x <urn:knows> ?y }";
final Multiset<BindingSet> expected = HashMultiset.create();
List<String> varNames = Arrays.asList("subject", "predicate", "object");
expected.add(new ListBindingSet(varNames, bob, RDF.TYPE, OWL.THING));
expected.add(new ListBindingSet(varNames, bob, RDF.TYPE, person));
expected.add(new ListBindingSet(varNames, eve, RDF.TYPE, OWL.THING));
expected.add(new ListBindingSet(varNames, eve, RDF.TYPE, person));
// Test query
testPipelineQuery(query, expected);
}
@Test
public void testTriplePipeline() throws Exception {
IRI alice = VF.createIRI("urn:Alice");
IRI bob = VF.createIRI("urn:Bob");
IRI eve = VF.createIRI("urn:Eve");
IRI friend = VF.createIRI("urn:friend");
IRI knows = VF.createIRI("urn:knows");
IRI year = VF.createIRI("urn:year");
Literal yearLiteral = VF.createLiteral("2017", XMLSchema.GYEAR);
final String query = "CONSTRUCT {\n"
+ " ?x <urn:knows> ?y .\n"
+ " ?x <urn:year> \"2017\"^^<" + XMLSchema.GYEAR + "> .\n"
+ "} WHERE { ?x <urn:friend> ?y }";
insert(alice, friend, bob);
insert(bob, knows, eve);
insert(eve, knows, alice);
// Prepare query and convert to pipeline
QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr());
SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection());
queryTree.visit(visitor);
// Get pipeline, add triple conversion, and verify that the result is a
// properly serialized statement
Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode);
AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg();
List<Bson> triplePipeline = pipelineNode.getTriplePipeline(System.currentTimeMillis(), false);
SimpleMongoDBStorageStrategy strategy = new SimpleMongoDBStorageStrategy();
List<Statement> results = new LinkedList<>();
for (Document doc : getRyaCollection().aggregate(triplePipeline)) {
final DBObject dbo = (DBObject) JSON.parse(doc.toJson());
RyaStatement rstmt = strategy.deserializeDBObject(dbo);
Statement stmt = RyaToRdfConversions.convertStatement(rstmt);
results.add(stmt);
}
Assert.assertEquals(2, results.size());
Assert.assertTrue(results.contains(VF.createStatement(alice, knows, bob)));
Assert.assertTrue(results.contains(VF.createStatement(alice, year, yearLiteral)));
}
@Test
public void testRequiredDerivationLevel() throws Exception {
// Insert data
IRI person = VF.createIRI("urn:Person");
IRI livingThing = VF.createIRI("urn:LivingThing");
IRI human = VF.createIRI("urn:Human");
IRI programmer = VF.createIRI("urn:Programmer");
IRI thing = VF.createIRI("urn:Thing");
insert(programmer, RDFS.SUBCLASSOF, person);
insert(person, RDFS.SUBCLASSOF, FOAF.PERSON);
insert(FOAF.PERSON, RDFS.SUBCLASSOF, person);
insert(person, OWL.EQUIVALENTCLASS, human);
insert(person, RDFS.SUBCLASSOF, livingThing);
insert(livingThing, RDFS.SUBCLASSOF, thing);
insert(thing, RDFS.SUBCLASSOF, OWL.THING, 1);
insert(OWL.THING, RDFS.SUBCLASSOF, thing);
dao.flush();
// Define query and expected results
final String query = "SELECT ?A ?B WHERE {\n"
+ " ?A rdfs:subClassOf ?B .\n"
+ " ?B rdfs:subClassOf ?A .\n"
+ "}";
List<String> varNames = Arrays.asList("A", "B");
Multiset<BindingSet> expectedSolutions = HashMultiset.create();
expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON));
expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person));
expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING));
expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing));
// Prepare query and convert to pipeline
QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr());
SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection());
queryTree.visit(visitor);
Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode);
AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg();
// Extend the pipeline by requiring a derivation level of zero (should have no effect)
pipelineNode.requireSourceDerivationDepth(0);
Multiset<BindingSet> solutions = HashMultiset.create();
CloseableIteration<BindingSet, QueryEvaluationException> iter = pipelineNode.evaluate(new QueryBindingSet());
while (iter.hasNext()) {
solutions.add(iter.next());
}
Assert.assertEquals(expectedSolutions, solutions);
// Extend the pipeline by requiring a derivation level of one (should produce the thing/thing pair)
expectedSolutions = HashMultiset.create();
expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING));
expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing));
pipelineNode.requireSourceDerivationDepth(1);
solutions = HashMultiset.create();
iter = pipelineNode.evaluate(new QueryBindingSet());
while (iter.hasNext()) {
solutions.add(iter.next());
}
Assert.assertEquals(expectedSolutions, solutions);
}
@Test
public void testRequiredTimestamp() throws Exception {
// Insert data
IRI person = VF.createIRI("urn:Person");
IRI livingThing = VF.createIRI("urn:LivingThing");
IRI human = VF.createIRI("urn:Human");
IRI programmer = VF.createIRI("urn:Programmer");
IRI thing = VF.createIRI("urn:Thing");
insert(programmer, RDFS.SUBCLASSOF, person);
insert(person, RDFS.SUBCLASSOF, FOAF.PERSON, 2);
insert(FOAF.PERSON, RDFS.SUBCLASSOF, person);
insert(person, OWL.EQUIVALENTCLASS, human);
insert(person, RDFS.SUBCLASSOF, livingThing);
insert(livingThing, RDFS.SUBCLASSOF, thing);
insert(thing, RDFS.SUBCLASSOF, OWL.THING);
insert(OWL.THING, RDFS.SUBCLASSOF, thing);
dao.flush();
// Define query and expected results
final String query = "SELECT ?A ?B WHERE {\n"
+ " ?A rdfs:subClassOf ?B .\n"
+ " ?B rdfs:subClassOf ?A .\n"
+ "}";
List<String> varNames = Arrays.asList("A", "B");
Multiset<BindingSet> expectedSolutions = HashMultiset.create();
expectedSolutions.add(new ListBindingSet(varNames, person, FOAF.PERSON));
expectedSolutions.add(new ListBindingSet(varNames, FOAF.PERSON, person));
expectedSolutions.add(new ListBindingSet(varNames, thing, OWL.THING));
expectedSolutions.add(new ListBindingSet(varNames, OWL.THING, thing));
// Prepare query and convert to pipeline
QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr());
SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection());
queryTree.visit(visitor);
Assert.assertTrue(queryTree.getArg() instanceof AggregationPipelineQueryNode);
AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) queryTree.getArg();
// Extend the pipeline by requiring a timestamp of zero (should have no effect)
pipelineNode.requireSourceTimestamp(0);
Multiset<BindingSet> solutions = HashMultiset.create();
CloseableIteration<BindingSet, QueryEvaluationException> iter = pipelineNode.evaluate(new QueryBindingSet());
while (iter.hasNext()) {
solutions.add(iter.next());
}
Assert.assertEquals(expectedSolutions, solutions);
// Extend the pipeline by requiring a future timestamp (should produce no results)
long delta = 1000 * 60 * 60 * 24;
pipelineNode.requireSourceTimestamp(System.currentTimeMillis() + delta);
iter = pipelineNode.evaluate(new QueryBindingSet());
Assert.assertFalse(iter.hasNext());
}
}