RYA-417 Batch forward-chaining rules engine. Closes #255.
diff --git a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
index 7a84f5d..45092e4 100644
--- a/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
+++ b/dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/aggregation/AggregationPipelineQueryNode.java
@@ -531,7 +531,9 @@
* The number of documents produced by the pipeline after this operation
* will be the number of documents entering this stage (the number of
* intermediate results) multiplied by the number of
- * {@link ProjectionElemList}s supplied here.
+ * {@link ProjectionElemList}s supplied here. Empty projections are
+ * unsupported; if one or more projections given binds zero variables, then
+ * the pipeline will be unchanged and the method will return false.
* @param projections One or more projections, i.e. mappings from the result
* at this stage of the query into a set of variables.
* @return true if the projection(s) were added to the pipeline.
@@ -544,6 +546,10 @@
Set<String> bindingNamesUnion = new HashSet<>();
Set<String> bindingNamesIntersection = null;
for (ProjectionElemList projection : projections) {
+ if (projection.getElements().isEmpty()) {
+ // Empty projections are unsupported -- fail when seen
+ return false;
+ }
Document valueDoc = new Document();
Document hashDoc = new Document();
Document typeDoc = new Document();
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java
index 45855a0..0552ac0 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/PipelineQueryIT.java
@@ -49,8 +49,10 @@
import org.openrdf.model.vocabulary.XMLSchema;
import org.openrdf.query.BindingSet;
import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.algebra.Projection;
import org.openrdf.query.algebra.QueryRoot;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
+import org.openrdf.query.impl.EmptyBindingSet;
import org.openrdf.query.impl.ListBindingSet;
import org.openrdf.query.parser.sparql.SPARQLParser;
@@ -135,6 +137,36 @@
}
@Test
+ public void testNoVariableSP() throws Exception {
+ // Insert data
+ insert(OWL.THING, RDF.TYPE, OWL.CLASS);
+ insert(FOAF.PERSON, RDF.TYPE, OWL.CLASS, 1);
+ insert(FOAF.PERSON, RDFS.SUBCLASSOF, OWL.THING);
+ insert(VF.createURI("urn:Alice"), RDF.TYPE, FOAF.PERSON);
+ dao.flush();
+ // Define query and expected results
+ final String query = "SELECT * WHERE {\n"
+ + " owl:Thing a owl:Class .\n"
+ + "}";
+ Multiset<BindingSet> expectedSolutions = HashMultiset.create();
+ expectedSolutions.add(new EmptyBindingSet());
+ // Execute pipeline and verify results
+ QueryRoot queryTree = new QueryRoot(PARSER.parseQuery(query, null).getTupleExpr());
+ SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(getRyaCollection());
+ queryTree.visit(visitor);
+ Assert.assertTrue(queryTree.getArg() instanceof Projection);
+ Projection projection = (Projection) queryTree.getArg();
+ Assert.assertTrue(projection.getArg() instanceof AggregationPipelineQueryNode);
+ AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projection.getArg();
+ Multiset<BindingSet> solutions = HashMultiset.create();
+ CloseableIteration<BindingSet, QueryEvaluationException> iter = pipelineNode.evaluate(new QueryBindingSet());
+ while (iter.hasNext()) {
+ solutions.add(iter.next());
+ }
+ Assert.assertEquals(expectedSolutions, solutions);
+ }
+
+ @Test
public void testJoinTwoSharedVariables() throws Exception {
// Insert data
URI person = VF.createURI("urn:Person");
diff --git a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java
index cc9349b..506b8af 100644
--- a/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java
+++ b/dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/aggregation/SparqlToPipelineTransformVisitorTest.java
@@ -29,6 +29,7 @@
import org.openrdf.model.URI;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.OWL;
import org.openrdf.model.vocabulary.RDF;
import org.openrdf.query.algebra.Extension;
import org.openrdf.query.algebra.ExtensionElem;
@@ -153,6 +154,19 @@
}
@Test
+ public void testEmptyProjection() throws Exception {
+ StatementPattern isClass = new StatementPattern(constant(UNDERGRAD), constant(RDF.TYPE), constant(OWL.CLASS));
+ QueryRoot queryTree = new QueryRoot(new Projection(isClass, new ProjectionElemList()));
+ SparqlToPipelineTransformVisitor visitor = new SparqlToPipelineTransformVisitor(collection);
+ queryTree.visit(visitor);
+ Assert.assertTrue(queryTree.getArg() instanceof Projection);
+ Projection projectNode = (Projection) queryTree.getArg();
+ Assert.assertTrue(projectNode.getArg() instanceof AggregationPipelineQueryNode);
+ AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) projectNode.getArg();
+ Assert.assertEquals(Sets.newHashSet(), pipelineNode.getAssuredBindingNames());
+ }
+
+ @Test
public void testMultiProjection() throws Exception {
StatementPattern isUndergrad = new StatementPattern(new Var("x"), constant(RDF.TYPE), constant(UNDERGRAD));
StatementPattern isCourse = new StatementPattern(new Var("course"), constant(RDF.TYPE), constant(COURSE));
diff --git a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
index b5adee3..56af9b4 100644
--- a/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
+++ b/extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java
@@ -88,33 +88,10 @@
// Get a reference to a Mongo DB configuration object.
final MongoDBRdfConfiguration mongoConfig = (config instanceof MongoDBRdfConfiguration) ?
(MongoDBRdfConfiguration)config : new MongoDBRdfConfiguration(config);
-
- // Create the MongoClient that will be used by the Sail object's components.
- final MongoClient client = createMongoClient(mongoConfig);
-
- // Add the Indexer and Optimizer names to the configuration object that are configured to be used.
- ConfigUtils.setIndexers(mongoConfig);
-
- // Populate the configuration using previously stored Rya Details if this instance uses them.
- try {
- final MongoRyaInstanceDetailsRepository ryaDetailsRepo = new MongoRyaInstanceDetailsRepository(client, mongoConfig.getRyaInstanceName());
- RyaDetailsToConfiguration.addRyaDetailsToConfiguration(ryaDetailsRepo.getRyaInstanceDetails(), mongoConfig);
- } catch (final RyaDetailsRepositoryException e) {
- LOG.info("Instance does not have a rya details collection, skipping.");
- }
-
- // Set the configuration to the stateful configuration that is used to pass the constructed objects around.
- final StatefulMongoDBRdfConfiguration statefulConfig = new StatefulMongoDBRdfConfiguration(mongoConfig, client);
- final List<MongoSecondaryIndex> indexers = statefulConfig.getInstances(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, MongoSecondaryIndex.class);
- statefulConfig.setIndexers(indexers);
- rdfConfig = statefulConfig;
-
- // Create the DAO that is able to interact with MongoDB.
- final MongoDBRyaDAO mongoDao = new MongoDBRyaDAO();
- mongoDao.setConf(statefulConfig);
- mongoDao.init();
- dao = mongoDao;
-
+ // Instantiate a Mongo client and Mongo DAO.
+ dao = getMongoDAO(mongoConfig);
+ // Then use the DAO's newly-created stateful conf in place of the original
+ rdfConfig = dao.getConf();
} else {
rdfConfig = new AccumuloRdfConfiguration(config);
user = rdfConfig.get(ConfigUtils.CLOUDBASE_USER);
@@ -237,4 +214,37 @@
LOG.info("Instance does not have a rya details collection, skipping.");
}
}
+
+ /**
+ * Connects to MongoDB and creates a MongoDBRyaDAO.
+ * @param config - user configuration
+ * @return - MongoDBRyaDAO with Indexers configured according to user's specification
+ * @throws RyaDAOException if the DAO can't be initialized
+ */
+ public static MongoDBRyaDAO getMongoDAO(MongoDBRdfConfiguration mongoConfig) throws RyaDAOException {
+ // Create the MongoClient that will be used by the Sail object's components.
+ final MongoClient client = createMongoClient(mongoConfig);
+
+ // Add the Indexer and Optimizer names to the configuration object that are configured to be used.
+ ConfigUtils.setIndexers(mongoConfig);
+
+ // Populate the configuration using previously stored Rya Details if this instance uses them.
+ try {
+ final MongoRyaInstanceDetailsRepository ryaDetailsRepo = new MongoRyaInstanceDetailsRepository(client, mongoConfig.getRyaInstanceName());
+ RyaDetailsToConfiguration.addRyaDetailsToConfiguration(ryaDetailsRepo.getRyaInstanceDetails(), mongoConfig);
+ } catch (final RyaDetailsRepositoryException e) {
+ LOG.info("Instance does not have a rya details collection, skipping.");
+ }
+
+ // Set the configuration to the stateful configuration that is used to pass the constructed objects around.
+ final StatefulMongoDBRdfConfiguration statefulConfig = new StatefulMongoDBRdfConfiguration(mongoConfig, client);
+ final List<MongoSecondaryIndex> indexers = statefulConfig.getInstances(AccumuloRdfConfiguration.CONF_ADDITIONAL_INDEXERS, MongoSecondaryIndex.class);
+ statefulConfig.setIndexers(indexers);
+
+ // Create the DAO that is able to interact with MongoDB.
+ final MongoDBRyaDAO mongoDao = new MongoDBRyaDAO();
+ mongoDao.setConf(statefulConfig);
+ mongoDao.init();
+ return mongoDao;
+ }
}
\ No newline at end of file
diff --git a/extras/pom.xml b/extras/pom.xml
index 62220ca..4ebcb82 100644
--- a/extras/pom.xml
+++ b/extras/pom.xml
@@ -45,6 +45,7 @@
<module>rya.merger</module>
<module>rya.giraph</module>
<module>rya.streams</module>
+ <module>rya.forwardchain</module>
</modules>
<profiles>
diff --git a/extras/rya.forwardchain/pom.xml b/extras/rya.forwardchain/pom.xml
new file mode 100644
index 0000000..7acabca
--- /dev/null
+++ b/extras/rya.forwardchain/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.extras</artifactId>
+ <version>3.2.12-incubating-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>rya.forwardchain</artifactId>
+ <name>Apache Rya Forward Chaining Inference</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.openrdf.sesame</groupId>
+ <artifactId>sesame-runtime</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.sail</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>rya.indexing</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.rya</groupId>
+ <artifactId>mongodb.rya</artifactId>
+ </dependency>
+
+ <!-- Testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <configuration>
+ <archive>
+ <index>true</index>
+ <manifest>
+ <addClasspath>true</addClasspath>
+ <mainClass>org.apache.rya.forwardchain.batch.ForwardChainSpinTool</mainClass>
+ </manifest>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <shadedArtifactAttached>true</shadedArtifactAttached>
+ <shadedClassifierName>shaded</shadedClassifierName>
+ <transformers>
+ <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/ForwardChainConstants.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/ForwardChainConstants.java
new file mode 100644
index 0000000..f1fe8b3
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/ForwardChainConstants.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain;
+
+import org.apache.rya.api.RdfCloudTripleStoreConstants;
+import org.apache.rya.api.domain.RyaSchema;
+import org.apache.rya.api.domain.RyaURI;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.openrdf.model.URI;
+import org.openrdf.model.ValueFactory;
+
+public class ForwardChainConstants {
+ private static final ValueFactory VF = RdfCloudTripleStoreConstants.VALUE_FACTORY;
+ private static final String NAMESPACE = RyaSchema.NAMESPACE;
+
+ public static final URI DERIVATION_TIME = VF.createURI(NAMESPACE, "forwardChainIteration");
+ public static final URI DERIVATION_RULE = VF.createURI(NAMESPACE, "forwardChainRule");
+
+ public static final RyaURI RYA_DERIVATION_RULE = RdfToRyaConversions.convertURI(DERIVATION_RULE);
+ public static final RyaURI RYA_DERIVATION_TIME = RdfToRyaConversions.convertURI(DERIVATION_TIME);
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/ForwardChainException.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/ForwardChainException.java
new file mode 100644
index 0000000..64b05a4
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/ForwardChainException.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain;
+
+/**
+ * Broad exception representing an error during forward chaining. Useful for
+ * wrapping the diverse kinds of exceptions that may be thrown by
+ * implementations of reasoning logic.
+ */
+public class ForwardChainException extends Exception {
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * Constructs a new ForwardChainException with a message and a cause.
+ * @param string Detail message
+ * @param e Underlying cause
+ */
+ public ForwardChainException(String string, Exception e) {
+ super(string , e);
+ }
+
+ /**
+ * Constructs a new ForwardChainException with a message only.
+ * @param string Detail message
+ */
+ public ForwardChainException(String string) {
+ super(string);
+ }
+
+ /**
+ * Constructs a new ForwardChainException with a root cause and no
+ * additional message.
+ * @param e Underlying cause
+ */
+ public ForwardChainException(Exception e) {
+ super(e);
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/batch/AbstractForwardChainTool.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/batch/AbstractForwardChainTool.java
new file mode 100644
index 0000000..db08407
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/batch/AbstractForwardChainTool.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.batch;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Tool;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.rule.Ruleset;
+import org.apache.rya.forwardchain.strategy.AbstractForwardChainStrategy;
+import org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy;
+import org.apache.rya.forwardchain.strategy.MongoPipelineStrategy;
+import org.apache.rya.forwardchain.strategy.RoundRobinStrategy;
+import org.apache.rya.forwardchain.strategy.SailExecutionStrategy;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import com.google.common.base.Preconditions;
+
+/**
+ * Base class for a {@link Tool} that executes forward-chaining rules until
+ * completion (when no more new information can be derived).
+ * <p>
+ * Subclasses must implement {@link #getRuleset()} to yield the specific set of
+ * {@link Rule}s to materialize.
+ * <p>
+ * Subclasses may additionally override {@link #getStrategy()} and/or
+ * {@link #getRuleStrategy()} to provide specific forward chaining execution
+ * logic.
+ */
+public abstract class AbstractForwardChainTool implements Tool {
+ private static final Logger logger = Logger.getLogger(AbstractForwardChainTool.class);
+
+ private RdfCloudTripleStoreConfiguration conf;
+
+ private long numInferences = 0;
+
+ /**
+ * Set the {@link Configuration} for this tool, which will be converted to
+ * an {@link RdfCloudTripleStoreConfiguration}.
+ * @param conf Configuration object that specifies Rya connection details.
+ * Should not be null.
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ Preconditions.checkNotNull(conf);
+ if (conf.getBoolean(ConfigUtils.USE_MONGO, false)) {
+ this.conf = new MongoDBRdfConfiguration(conf);
+ }
+ else {
+ this.conf = new AccumuloRdfConfiguration(conf);
+ }
+ }
+
+ /**
+ * Get the RdfCloudTripleStoreConfiguration used by this tool.
+ * @return Rya configuration object.
+ */
+ @Override
+ public RdfCloudTripleStoreConfiguration getConf() {
+ return conf;
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ numInferences = getStrategy().executeAll(getRuleset());
+ logger.info("Forward chaining complete; made " + numInferences + " inferences.");
+ return 0;
+ }
+
+ /**
+ * Gets the number of inferences that have been made.
+ * @return zero before forward chaining, or the total number of inferences
+ * after.
+ */
+ public long getNumInferences() {
+ return numInferences;
+ }
+
+ /**
+ * Get the high-level {@link AbstractForwardChainStrategy} that governs how
+ * reasoning will proceed. By default, returns a {@link RoundRobinStrategy}
+ * which executes each relevant rule one-by-one, then moves to the next
+ * iteration and repeats, until no rules are still relevant. Subclasses may
+ * override this method to provide alternative strategies.
+ * @return The high-level forward chaining logic.
+ * @throws ForwardChainException if the strategy can't be instantiated.
+ */
+ protected AbstractForwardChainStrategy getStrategy() throws ForwardChainException {
+ return new RoundRobinStrategy(getRuleStrategy());
+ }
+
+ /**
+ * Get the low-level {@link AbstractRuleExecutionStrategy} that governs the
+ * application of rules on an individual basis. This is used by the default
+ * ForwardChainStrategy (RoundRobinStrategy) and may be used by any
+ * high-level strategy that executes rules individually. By default, returns
+ * a {@link MongoPipelineStrategy} if the configuration object specifies a
+ * MongoDB connection with aggregation pipelines enabled, and a
+ * {@link SailExecutionStrategy} otherwise. Subclasses may override this
+ * method to provide alternative strategies.
+ * @return The low-level rule execution logic.
+ * @throws ForwardChainExceptionthe strategy can't be instantiated.
+ */
+ protected AbstractRuleExecutionStrategy getRuleStrategy() throws ForwardChainException {
+ if (ConfigUtils.getUseMongo(conf)) {
+ final MongoDBRdfConfiguration mongoConf;
+ if (conf instanceof MongoDBRdfConfiguration) {
+ mongoConf = (MongoDBRdfConfiguration) conf;
+ }
+ else {
+ mongoConf = new MongoDBRdfConfiguration(conf);
+ }
+ if (mongoConf.getUseAggregationPipeline()) {
+ return new MongoPipelineStrategy(mongoConf);
+ }
+ }
+ return new SailExecutionStrategy(conf);
+ }
+
+ /**
+ * Get the set of rules for this tool to apply. Subclasses should implement
+ * this for their specific domains. The subclass should ensure that the
+ * ruleset returned only contains rules whose types are supported by the
+ * forward chaining strategy. The default strategy supports only CONSTRUCT
+ * rules, so the ruleset should only contain {@link AbstractConstructRule}s.
+ * @return A set of forward-chaining rules.
+ * @throws ForwardChainException if rules couldn't be retrieved.
+ */
+ protected abstract Ruleset getRuleset() throws ForwardChainException;
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/batch/ForwardChainSpinTool.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/batch/ForwardChainSpinTool.java
new file mode 100644
index 0000000..c35f37e
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/batch/ForwardChainSpinTool.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.batch;
+
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.rule.Ruleset;
+import org.apache.rya.forwardchain.rule.SpinConstructRule;
+
+/**
+ * {@link Tool} to load SPIN Construct rules from a Rya data store, then apply
+ * those rules to the same store using forward-chaining inference
+ * (materialization), adding triples back to Rya until no more information can
+ * be derived.
+ */
+public class ForwardChainSpinTool extends AbstractForwardChainTool {
+ private Ruleset ruleset;
+
+ /**
+ * Constructor that takes in an {@link RdfCloudTripleStoreConfiguration}.
+ * @param conf Configuration object containing Rya connection information.
+ */
+ public ForwardChainSpinTool(RdfCloudTripleStoreConfiguration conf) {
+ setConf(conf);
+ }
+
+ /**
+ * Default constructor that does not take in a configuration object. Rya
+ * connection details should be provided via an
+ * RdfCloudTripleStoreConfiguration, either using
+ * {@link AbstractForwardChainTool#setConf} or a {@link ToolRunner}.
+ */
+ public ForwardChainSpinTool() { }
+
+ /**
+ * Load SPIN Construct rules from Rya.
+ * @return A set of construct query rules.
+ * @throws ForwardChainException if loading rules from Rya fails.
+ */
+ @Override
+ protected Ruleset getRuleset() throws ForwardChainException {
+ if (ruleset == null) {
+ ruleset = SpinConstructRule.loadSpinRules(getConf());
+ }
+ return ruleset;
+ }
+
+ public static void main(String[] args) throws Exception {
+ long start = System.currentTimeMillis();
+ ForwardChainSpinTool tool = new ForwardChainSpinTool();
+ ToolRunner.run(tool, args);
+ long end = System.currentTimeMillis();
+ double seconds = (end - start) / 1000.0;
+ long inferences = tool.getNumInferences();
+ long rules = tool.getRuleset().getRules().size();
+ System.out.println(String.format("ForwardChainSpinTool: %d rules, %d inferences, %.3f seconds",
+ rules, inferences, seconds));
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractConstructRule.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractConstructRule.java
new file mode 100644
index 0000000..c4c12c7
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractConstructRule.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.parser.ParsedGraphQuery;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A rule that produces new triples, and can be expressed as a graph query
+ * (SPARQL "CONSTRUCT"). Should not modify existing triples.
+ */
+public abstract class AbstractConstructRule implements Rule {
+ /**
+ * Get the query tree corresponding to this construct rule.
+ * @return The query algebra representation of this rule.
+ */
+ public abstract ParsedGraphQuery getQuery();
+
+ @Override
+ public long execute(AbstractRuleExecutionStrategy strategy,
+ StatementMetadata metadata) throws ForwardChainException {
+ Preconditions.checkNotNull(strategy);
+ Preconditions.checkNotNull(metadata);
+ return strategy.executeConstructRule(this, metadata);
+ }
+
+ /**
+ * Whether any of the possible consequents of this rule include anonymous
+ * variables. Care should be taken when executing such rules, so that
+ * repeated application doesn't continually produce new bnodes.
+ * @return true if any subject, predicate, or object variable involved in a
+ * consequent is flagged as anonymous.
+ */
+ public boolean hasAnonymousConsequent() {
+ for (StatementPattern sp : getConsequentPatterns()) {
+ if (sp.getSubjectVar().isAnonymous()
+ || sp.getPredicateVar().isAnonymous()
+ || sp.getObjectVar().isAnonymous()) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractInconsistencyRule.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractInconsistencyRule.java
new file mode 100644
index 0000000..451c5e4
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractInconsistencyRule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy;
+import org.openrdf.query.algebra.StatementPattern;
+
+/**
+ * A rule that identifies an inconsistency in the data, but does not add or
+ * modify any triples.
+ */
+public abstract class AbstractInconsistencyRule implements Rule {
+
+ @Override
+ public boolean canConclude(StatementPattern sp) {
+ return false;
+ }
+
+ @Override
+ public Collection<StatementPattern> getConsequentPatterns() {
+ return Arrays.asList();
+ }
+
+ @Override
+ public long execute(AbstractRuleExecutionStrategy strategy,
+ StatementMetadata metadata) throws ForwardChainException {
+ return strategy.executeInconsistencyRule(this, metadata);
+ }
+
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractUpdateRule.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractUpdateRule.java
new file mode 100644
index 0000000..d87aeae
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AbstractUpdateRule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy;
+
+/**
+ * A rule that modifies existing data.
+ */
+public abstract class AbstractUpdateRule implements Rule {
+ @Override
+ public long execute(AbstractRuleExecutionStrategy strategy,
+ StatementMetadata metadata) throws ForwardChainException {
+ return strategy.executeUpdateRule(this, metadata);
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java
new file mode 100644
index 0000000..1f2cbba
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/AntecedentVisitor.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+/**
+ * Query visitor that identifies all triple patterns represented as
+ * {@link StatementPattern}s in a query, which therefore represent triples
+ * that could potentially contribute to a solution. Considers only the statement
+ * patterns themselves, i.e. the leaves of the query tree, and does not consider
+ * other constraints that may restrict the set of triples that may be relevant.
+ * This means relying on this analysis to determine whether a fact can be part
+ * of a solution can yield false positives, but not false negatives.
+ */
+class AntecedentVisitor extends QueryModelVisitorBase<RuntimeException> {
+ private Set<StatementPattern> antecedentStatementPatterns = new HashSet<>();
+
+ /**
+ * Get the StatementPatterns used by this query.
+ * @return A set of patterns that can contribute to query solutions.
+ */
+ public Set<StatementPattern> getAntecedents() {
+ return antecedentStatementPatterns;
+ }
+
+ @Override
+ public void meet(StatementPattern sp) {
+ antecedentStatementPatterns.add(sp.clone());
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/ConstructConsequentVisitor.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/ConstructConsequentVisitor.java
new file mode 100644
index 0000000..e28dbe3
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/ConstructConsequentVisitor.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.openrdf.model.Value;
+import org.openrdf.query.algebra.BNodeGenerator;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+
+/**
+ * Query visitor that identifies all triple patterns produced by a "CONSTRUCT"
+ * query. Finds the topmost instance of a {@link Projection} or
+ * {@link MultiProjection}, and expects the variables projected to include
+ * "subject", "predicate", and "object". Each projection is converted to a
+ * {@link StatementPattern}, where any constant values are expected to be
+ * provided by an Extension directly underneath the projection, if applicable.
+ * <p>
+ * Undefined behavior if applied to a query other than a CONSTRUCT query.
+ * <p>
+ * Does not report any constraints on possible consequent triples beyond the
+ * constant values, where appropriate, of each part of the triple. Therefore,
+ * this analysis may produce an overly broad set of possible consequents
+ * compared to some more sophisticated method.
+ */
+public class ConstructConsequentVisitor extends QueryModelVisitorBase<RuntimeException> {
+ private Set<StatementPattern> consequentStatementPatterns = new HashSet<>();
+
+ private static final String SUBJECT_VAR_NAME = "subject";
+ private static final String PREDICATE_VAR_NAME = "predicate";
+ private static final String OBJECT_VAR_NAME = "object";
+
+ /**
+ * Get the possible conclusions of this construct rule.
+ * @return StatementPatterns representing the possible triple patterns that
+ * can be inferred.
+ */
+ public Set<StatementPattern> getConsequents() {
+ return consequentStatementPatterns;
+ }
+
+ /**
+ * Get the names of any bnodes generated by this construct rule.
+ * @return Variable names corresponding to new entities
+ */
+ public Set<StatementPattern> getBnodes() {
+ return consequentStatementPatterns;
+ }
+
+ @Override
+ public void meet(Projection projection) {
+ if (projection.getArg() instanceof Extension) {
+ recordConsequent(projection.getProjectionElemList(),
+ ((Extension) projection.getArg()).getElements());
+ }
+ else {
+ recordConsequent(projection.getProjectionElemList(), Arrays.asList());
+ }
+ }
+
+ @Override
+ public void meet(MultiProjection projection) {
+ List<ExtensionElem> bindings;
+ if (projection.getArg() instanceof Extension) {
+ bindings = ((Extension) projection.getArg()).getElements();
+ }
+ else {
+ bindings = Arrays.asList();
+ }
+ for (ProjectionElemList template : projection.getProjections()) {
+ recordConsequent(template, bindings);
+ }
+ }
+
+ private void recordConsequent(ProjectionElemList variables, List<ExtensionElem> extensionElements) {
+ Map<String, Value> bindings = new ConcurrentHashMap<>();
+ Map<String, Value> values = new ConcurrentHashMap<>();
+ Set<String> queryBnodes = new HashSet<>();
+ Set<String> projectedBnodes = new HashSet<>();
+ for (ExtensionElem ee : extensionElements) {
+ if (ee.getExpr() instanceof ValueConstant) {
+ bindings.put(ee.getName(), ((ValueConstant) ee.getExpr()).getValue());
+ }
+ else if (ee.getExpr() instanceof BNodeGenerator) {
+ queryBnodes.add(ee.getName());
+ }
+ }
+ for (ProjectionElem var : variables.getElements()) {
+ String sourceName = var.getSourceName();
+ String targetName = var.getTargetName();
+ Value constValue = bindings.get(sourceName);
+ if (constValue != null) {
+ values.put(targetName, constValue);
+ }
+ else if (queryBnodes.contains(sourceName)) {
+ projectedBnodes.add(targetName);
+ }
+ }
+ Var subjVar = new Var(SUBJECT_VAR_NAME, values.get(SUBJECT_VAR_NAME));
+ Var predVar = new Var(PREDICATE_VAR_NAME, values.get(PREDICATE_VAR_NAME));
+ Var objVar = new Var(OBJECT_VAR_NAME, values.get(OBJECT_VAR_NAME));
+ subjVar.setAnonymous(projectedBnodes.contains(SUBJECT_VAR_NAME));
+ predVar.setAnonymous(projectedBnodes.contains(PREDICATE_VAR_NAME));
+ objVar.setAnonymous(projectedBnodes.contains(OBJECT_VAR_NAME));
+ StatementPattern sp = new StatementPattern(subjVar, predVar, objVar);
+ consequentStatementPatterns.add(sp);
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/Rule.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/Rule.java
new file mode 100644
index 0000000..74004b9
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/Rule.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Collection;
+
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy;
+import org.openrdf.query.algebra.StatementPattern;
+
+/**
+ * Represents a forward-chaining inference rule. A rule is triggered by some
+ * combination of triples, and may produce some combination of triples when
+ * applied. Potential triggers (antecedents) and potential results (consequents)
+ * are represented in a general form as {@link StatementPattern}s and can be
+ * used to determine relationships between rules.
+ */
+public interface Rule {
+ /**
+ * Whether this rule, if applied, could produce triples of a given form.
+ * @param sp A statement pattern describing a possible inferred triple;
+ * assumed not null.
+ * @return true if a consequent of this rule could match the pattern.
+ */
+ abstract public boolean canConclude(StatementPattern sp);
+
+ /**
+ * All {@link StatementPattern}s that can, in some combination, trigger this
+ * rule. Should be a complete set, such that if no statements matching any
+ * of the patterns exist, the rule cannot derive any new information.
+ * @return Any number of statement patterns.
+ */
+ abstract public Collection<StatementPattern> getAntecedentPatterns();
+
+ /**
+ * {@link StatementPattern}s completely describing the possible conclusions
+ * of this rule. Any derived statement should match one of these patterns.
+ * @return Any number of statement patterns.
+ */
+ abstract public Collection<StatementPattern> getConsequentPatterns();
+
+ /**
+ * Given an {@link AbstractRuleExecutionStrategy}, executes this rule.
+ * Associates any new or modified triples with the specified statement
+ * metadata.
+ * @param strategy A strategy capable of applying individual rules; should
+ * not be null.
+ * @param metadata StatementMetadata to add to any results. Can be used to
+ * record the circumstances of the derivation. Should not be null; use
+ * {@link StatementMetadata#EMPTY_METADATA} to add none. Implementing
+ * classes may add additional metadata specific to the rule.
+ * @return The number of new inferences made during rule execution.
+ * @throws ForwardChainException if an error was encountered during
+ * rule application.
+ */
+ abstract public long execute(AbstractRuleExecutionStrategy strategy,
+ StatementMetadata metadata) throws ForwardChainException;
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/Ruleset.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/Ruleset.java
new file mode 100644
index 0000000..965d2d3
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/Ruleset.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.openrdf.query.algebra.StatementPattern;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Represents a set of forward-chaining {@link Rule}s and their relationships.
+ */
+public class Ruleset {
+ private final Set<Rule> rules;
+ private final Map<Rule, Set<Rule>> successors;
+ private final Map<Rule, Set<Rule>> predecessors;
+
+ private final Logger logger = Logger.getLogger(this.getClass());
+
+ /**
+ * Constructor. Takes in a set of rules and determines their dependencies.
+ * @param rules The complete set of rules to process; should not be null.
+ */
+ public Ruleset(Collection<Rule> rules) {
+ Preconditions.checkNotNull(rules);
+ this.rules = new HashSet<>();
+ for (Rule rule : rules) {
+ if (rule != null) {
+ this.rules.add(rule);
+ }
+ }
+ successors = new ConcurrentHashMap<>();
+ predecessors = new ConcurrentHashMap<>();
+ // Build the dependency graph of all the rules, in both directions
+ for (Rule rule : rules) {
+ successors.put(rule, new HashSet<>());
+ predecessors.put(rule, new HashSet<>());
+ }
+ for (Rule rule1 : rules) {
+ for (Rule rule2 : rules) {
+ if (canTrigger(rule1, rule2)) {
+ logger.trace("\t" + rule1.toString() + " can trigger " + rule2.toString());
+ successors.get(rule1).add(rule2);
+ predecessors.get(rule2).add(rule1);
+ }
+ }
+ }
+ }
+
+ /**
+ * Get the rules associated with this ruleset.
+ * @return The complete set of rules.
+ */
+ public Set<Rule> getRules() {
+ return rules;
+ }
+
+ /**
+ * Given a rule, return the set of all rules that it may trigger. That is,
+ * if the rule were to produce inferences, those inferences might directly
+ * cause other rules to apply in turn.
+ * @param precedingRule The potentially triggering rule; not null.
+ * @return All rules that could be triggered by the given rule.
+ */
+ public Collection<Rule> getSuccessorsOf(Rule precedingRule) {
+ Preconditions.checkNotNull(precedingRule);
+ return successors.get(precedingRule);
+ }
+
+ /**
+ * Given a rule, return the set of all rules that could trigger it. That is,
+ * if any one of those rules were applied, their potential conclusions could
+ * directly cause the specified rule to apply in turn.
+ * @param dependentRule The potentially triggered rule; not null.
+ * @return All rules that could trigger the given rule.
+ */
+ public Collection<Rule> getPredecessorsOf(Rule dependentRule) {
+ Preconditions.checkNotNull(dependentRule);
+ return predecessors.get(dependentRule);
+ }
+
+ /**
+ * Given a pair of rules, determine whether a path exists from the first to
+ * the second. That is, whether the first rule precedes the second rule
+ * either directly or transitively. If either rule is null, no path exists.
+ * @param r1 The start of the path
+ * @param r2 The end of the path
+ * @return whether a forward path exists.
+ */
+ public boolean pathExists(Rule r1, Rule r2) {
+ if (r1 == null || r2 == null) {
+ return false;
+ }
+ Set<Rule> forwardFrontier = new HashSet<>();
+ Set<Rule> backwardFrontier = new HashSet<>();
+ Set<Rule> visitedForward = new HashSet<>();
+ Set<Rule> visitedBackward = new HashSet<>();
+ forwardFrontier.addAll(getSuccessorsOf(r1));
+ backwardFrontier.add(r2);
+ while (!forwardFrontier.isEmpty() && !backwardFrontier.isEmpty()) {
+ Set<Rule> currentGoals = new HashSet<>(backwardFrontier);
+ for (Rule goal : currentGoals) {
+ if (forwardFrontier.contains(goal)) {
+ return true;
+ }
+ else {
+ visitedBackward.add(goal);
+ backwardFrontier.addAll(getPredecessorsOf(goal));
+ }
+ }
+ backwardFrontier.removeAll(visitedBackward);
+ Set<Rule> currentSources = new HashSet<>(forwardFrontier);
+ for (Rule source : currentSources) {
+ if (backwardFrontier.contains(source)) {
+ return true;
+ }
+ else {
+ visitedForward.add(source);
+ forwardFrontier.addAll(getSuccessorsOf(source));
+ }
+ }
+ forwardFrontier.removeAll(visitedForward);
+ }
+ return false;
+ }
+
+ /**
+ * Whether the first rule can, in any circumstance, directly trigger the second.
+ * @param rule1 The first rule, which may produce some inferences
+ * @param rule2 The second rule, which may use the first rule's conclusions
+ * @return True if the first rule's conclusions could be used by the second.
+ */
+ private boolean canTrigger(Rule rule1, Rule rule2) {
+ if (rule1 == null || rule2 == null) {
+ return false;
+ }
+ for (StatementPattern antecedent : rule2.getAntecedentPatterns()) {
+ if (rule1.canConclude(antecedent)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/SpinConstructRule.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/SpinConstructRule.java
new file mode 100644
index 0000000..44e15e6
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/rule/SpinConstructRule.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.forwardchain.ForwardChainConstants;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.openrdf.model.Literal;
+import org.openrdf.model.Resource;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.model.vocabulary.SP;
+import org.openrdf.model.vocabulary.SPIN;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.MalformedQueryException;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResultHandlerBase;
+import org.openrdf.query.TupleQueryResultHandlerException;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.Join;
+import org.openrdf.query.algebra.QueryModelNode;
+import org.openrdf.query.algebra.SingletonSet;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.TupleExpr;
+import org.openrdf.query.algebra.UnaryTupleOperator;
+import org.openrdf.query.algebra.ValueExpr;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.helpers.QueryModelVisitorBase;
+import org.openrdf.query.parser.ParsedGraphQuery;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * Represents a SPIN Construct rule extracted from the data store, providing
+ * access to its associated query tree and providing methods to apply the rule.
+ */
+public class SpinConstructRule extends AbstractConstructRule {
+ private static Logger logger = Logger.getLogger(SpinConstructRule.class);
+
+ private final Resource ruleId;
+ private final ParsedGraphQuery graphQuery;
+ private Set<StatementPattern> antecedentStatementPatterns = null;
+ private Set<StatementPattern> consequentStatementPatterns = null;
+
+ /**
+ * Instantiate a SPIN construct rule given its associated type, URI or bnode
+ * identifier, and construct query tree. Modifies the query tree to
+ * incorporate the fact that ?this must belong to the associated type, and
+ * traverses the modified tree to find antecedent and consequent triple
+ * patterns.
+ * @param type This rule applies to objects of this type. Should not be
+ * null. If the type is owl:Thing or rdfs:Resource, it will be applied to
+ * any objects. Otherwise, a statement pattern will be added that
+ * effectively binds ?this to members of the type. Therefore, passing
+ * owl:Thing or rdfs:Resource yields the intended behavior of
+ * sp:thisUnbound.
+ * @param ruleId The Resource representing this rule in the RDF data;
+ * should not be null.
+ * @param graphQuery The query tree corresponding to the "construct" text;
+ * should not be null.
+ */
+ public SpinConstructRule(Resource type, Resource ruleId,
+ ParsedGraphQuery graphQuery) {
+ Preconditions.checkNotNull(type);
+ Preconditions.checkNotNull(ruleId);
+ Preconditions.checkNotNull(graphQuery);
+ this.ruleId = ruleId;
+ this.graphQuery = graphQuery;
+ // Add the type requirement: ?this must belong to the type
+ graphQuery.getTupleExpr().visit(new TypeRequirementVisitor("this", type));
+ // Find all statement patterns that could trigger this rule
+ AntecedentVisitor aVisitor = new AntecedentVisitor();
+ graphQuery.getTupleExpr().visit(aVisitor);
+ antecedentStatementPatterns = aVisitor.getAntecedents();
+ // Construct statement patterns for all possible conclusions of this rule
+ ConstructConsequentVisitor cVisitor = new ConstructConsequentVisitor();
+ graphQuery.getTupleExpr().visit(cVisitor);
+ consequentStatementPatterns = cVisitor.getConsequents();
+ }
+
+ /**
+ * Get the URI or bnode associated with this rule in the data.
+ * @return The rule's identifier.
+ */
+ public Resource getId() {
+ return ruleId;
+ }
+
+ @Override
+ public String toString() {
+ return "SpinConstructRule{" + ruleId.stringValue() + "}";
+ }
+
+ @Override
+ public ParsedGraphQuery getQuery() {
+ return graphQuery;
+ }
+
+ @Override
+ public boolean canConclude(StatementPattern sp) {
+ Preconditions.checkNotNull(sp);
+ Value s1 = getVarValue(sp.getSubjectVar());
+ Value p1 = getVarValue(sp.getPredicateVar());
+ Value o1 = getVarValue(sp.getObjectVar());
+ Value c1 = getVarValue(sp.getContextVar());
+ for (StatementPattern consequent : consequentStatementPatterns) {
+ Value s2 = getVarValue(consequent.getSubjectVar());
+ Value p2 = getVarValue(consequent.getPredicateVar());
+ Value o2 = getVarValue(consequent.getObjectVar());
+ Value c2 = getVarValue(consequent.getContextVar());
+ if ((s1 == null || s2 == null || s1.equals(s2))
+ && (p1 == null || p2 == null || p1.equals(p2))
+ && (o1 == null || o2 == null || o1.equals(o2))
+ && (c1 == null || c2 == null || c1.equals(c2))) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Collection<StatementPattern> getAntecedentPatterns() {
+ return antecedentStatementPatterns;
+ }
+
+ @Override
+ public Collection<StatementPattern> getConsequentPatterns() {
+ return consequentStatementPatterns;
+ }
+
+ @Override
+ public long execute(AbstractRuleExecutionStrategy strategy,
+ StatementMetadata metadata) throws ForwardChainException {
+ metadata.addMetadata(ForwardChainConstants.RYA_DERIVATION_RULE,
+ RdfToRyaConversions.convertResource(ruleId));
+ return super.execute(strategy, metadata);
+ }
+
+ private static Value getVarValue(Var var) {
+ return var == null ? null : var.getValue();
+ }
+
+ private static class TypeRequirementVisitor extends QueryModelVisitorBase<RuntimeException> {
+ private static final Var RDF_TYPE_VAR = new Var("-const-" + RDF.TYPE.stringValue(), RDF.TYPE);
+ private static final Set<Resource> BASE_TYPES = Sets.newHashSet(RDFS.RESOURCE, OWL.THING);
+ static {
+ RDF_TYPE_VAR.setConstant(true);
+ }
+
+ private final String varName;
+ private final StatementPattern typeRequirement;
+ public TypeRequirementVisitor(String varName, Resource requiredType) {
+ final Var typeVar = new Var("-const-" + requiredType.stringValue(), requiredType);
+ typeVar.setConstant(true);
+ this.varName = varName;
+ if (BASE_TYPES.contains(requiredType)) {
+ this.typeRequirement = null;
+ }
+ else {
+ this.typeRequirement = new StatementPattern(new Var(varName), RDF_TYPE_VAR, typeVar);
+ }
+ }
+ @Override
+ public void meet(SingletonSet node) {
+ if (typeRequirement != null) {
+ node.replaceWith(typeRequirement);
+ }
+ }
+ @Override
+ public void meet(Extension node) {
+ Set<String> argBindings = node.getArg().getBindingNames();
+ if (typeRequirement != null) {
+ node.getElements().removeIf(elem -> {
+ if (varName.equals(elem.getName())) {
+ ValueExpr expr = elem.getExpr();
+ if (expr == null) {
+ return true;
+ }
+ else if (expr instanceof Var) {
+ String fromName = ((Var) expr).getName();
+ if (getVarValue((Var) expr) == null && !argBindings.contains(fromName)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ });
+ meetUnaryTupleOperator(node);
+ }
+ }
+ @Override
+ public void meetNode(QueryModelNode node) {
+ if (typeRequirement != null) {
+ if (node instanceof TupleExpr && ((TupleExpr) node).getBindingNames().contains(varName)) {
+ final Join withType = new Join((TupleExpr) node.clone(), typeRequirement);
+ node.replaceWith(withType);
+ }
+ else {
+ node.visitChildren(this);
+ }
+ }
+ }
+ @Override
+ public void meetUnaryTupleOperator(UnaryTupleOperator node) {
+ if (typeRequirement != null) {
+ if (node.getArg().getBindingNames().contains(varName)) {
+ node.visitChildren(this);
+ }
+ else {
+ meetNode(node);
+ }
+ }
+ }
+ }
+
+ /**
+ * Load a set of SPIN rules from a data store.
+ * @param conf Contains the connection information. Not null.
+ * @return A map of rule identifiers to rule objects.
+ * @throws ForwardChainException if connecting, querying for rules, or
+ * parsing rules fails.
+ */
+ public static Ruleset loadSpinRules(RdfCloudTripleStoreConfiguration conf)
+ throws ForwardChainException {
+ Preconditions.checkNotNull(conf);
+ Map<Resource, Rule> rules = new ConcurrentHashMap<>();
+ // Connect to Rya
+ SailRepository repository = null;
+ SailRepositoryConnection conn = null;
+ try {
+ repository = new SailRepository(RyaSailFactory.getInstance(conf));
+ } catch (Exception e) {
+ throw new ForwardChainException("Couldn't initialize SAIL from configuration", e);
+ }
+ // Load and parse the individual SPIN rules from the data store
+ String ruleQueryString = "SELECT ?type ?rule ?text WHERE {\n"
+ + " ?type <" + SPIN.RULE_PROPERTY.stringValue() + "> ?rule .\n"
+ + " {\n"
+ + " ?rule a <" + SP.CONSTRUCT_CLASS.stringValue() + "> .\n"
+ + " ?rule <" + SP.TEXT_PROPERTY.stringValue() + "> ?text .\n"
+ + " } UNION {\n"
+ + " ?rule a ?template .\n"
+ + " ?template <" + SPIN.BODY_PROPERTY + ">? ?body .\n"
+ + " ?body a <" + SP.CONSTRUCT_CLASS.stringValue() + "> .\n"
+ + " ?body <" + SP.TEXT_PROPERTY.stringValue() + "> ?text .\n"
+ + " }\n"
+ + "}";
+ SPARQLParser parser = new SPARQLParser();
+ try {
+ conn = repository.getConnection();
+ TupleQuery ruleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, ruleQueryString);
+ ruleQuery.evaluate(new TupleQueryResultHandlerBase() {
+ @Override
+ public void handleSolution(BindingSet bs) throws TupleQueryResultHandlerException {
+ // For each rule identifier found, instantiate a SpinRule
+ Value requiredType = bs.getValue("type");
+ Value ruleIdentifier = bs.getValue("rule");
+ Value ruleText = bs.getValue("text");
+ if (requiredType instanceof Resource
+ && ruleIdentifier instanceof Resource
+ && ruleText instanceof Literal) {
+ ParsedQuery parsedRule;
+ try {
+ parsedRule = parser.parseQuery(ruleText.stringValue(), null);
+ if (parsedRule instanceof ParsedGraphQuery) {
+ SpinConstructRule rule = new SpinConstructRule(
+ (Resource) requiredType,
+ (Resource) ruleIdentifier,
+ (ParsedGraphQuery) parsedRule);
+ if (rule.hasAnonymousConsequent()) {
+ logger.error("Skipping unsupported rule " + ruleIdentifier
+ + " -- consequent refers to bnode, which is not"
+ + " currently supported (creating new bnodes at each"
+ + " application could lead to infinite recursion).");
+ }
+ else {
+ rules.put((Resource) ruleIdentifier, rule);
+ }
+ }
+ } catch (Exception e) {
+ throw new TupleQueryResultHandlerException(e);
+ }
+ }
+ }
+ });
+ } catch (TupleQueryResultHandlerException | QueryEvaluationException
+ | MalformedQueryException | RepositoryException e) {
+ throw new ForwardChainException("Couldn't retrieve SPIN rules", e);
+ }
+ finally {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (RepositoryException e) {
+ logger.warn("Error closing repository connection", e);
+ }
+ }
+ if (repository.isInitialized()) {
+ try {
+ repository.shutDown();
+ } catch (RepositoryException e) {
+ logger.warn("Error shutting down repository", e);
+ }
+ }
+ }
+ return new Ruleset(rules.values());
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/AbstractForwardChainStrategy.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/AbstractForwardChainStrategy.java
new file mode 100644
index 0000000..fb0314e
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/AbstractForwardChainStrategy.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.strategy;
+
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.rule.Ruleset;
+
+/**
+ * Base class for high-level strategies which define how to conduct
+ * forward-chaining reasoning (materialization).
+ */
+public abstract class AbstractForwardChainStrategy {
+ /**
+ * A running count of new inferences so far.
+ */
+ protected long totalInferences;
+
+ /**
+ * Initializes reasoning with respect to a given ruleset.
+ * @param ruleset The complete set of rules to materialize. Should not be
+ * null.
+ * @throws ForwardChainException if initialization fails.
+ */
+ abstract public void initialize(Ruleset ruleset) throws ForwardChainException;
+
+ /**
+ * Whether forward chaining is both initialized and yet to finish.
+ * @return true if a ruleset has been provided and some rules may still
+ * yield new information.
+ */
+ abstract protected boolean isActive();
+
+ /**
+ * Execute the next step of reasoning, such as a single rule if the strategy
+ * proceeds one rule at a time.
+ * @return The number of inferences made during this step.
+ * @throws ForwardChainException if any error is encountered during rule
+ * application.
+ */
+ abstract protected long executeNext() throws ForwardChainException;
+
+ /**
+ * Execute an entire ruleset until no new rules can be derived. Initializes
+ * strategy and proceeds until completion.
+ * @param rules The complete set of rules; not null.
+ * @return The number of total inferences made.
+ * @throws ForwardChainException if any error is encountered during
+ * initialization or application.
+ */
+ public long executeAll(Ruleset rules) throws ForwardChainException {
+ initialize(rules);
+ totalInferences = 0;
+ while (isActive()) {
+ totalInferences += executeNext();
+ }
+ return totalInferences;
+ }
+
+ /**
+ * Get the running total of inferences made so far.
+ * @return The number of inferences made since initialization.
+ */
+ public long getNumInferences() {
+ return totalInferences;
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/AbstractRuleExecutionStrategy.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/AbstractRuleExecutionStrategy.java
new file mode 100644
index 0000000..24c8de9
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/AbstractRuleExecutionStrategy.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.strategy;
+
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.forwardchain.ForwardChainException;
+
+import org.apache.rya.forwardchain.rule.AbstractConstructRule;
+import org.apache.rya.forwardchain.rule.AbstractInconsistencyRule;
+import org.apache.rya.forwardchain.rule.AbstractUpdateRule;
+
+/**
+ * Base class for rule application strategies, which can execute a single
+ * forward chaining (materialization) rule at a time. Subclasses may provide
+ * implementations of methods to execute whichever they support of construct
+ * rules, update rules, and inconsistency rules. The default behavior for all
+ * kinds is to throw an {@link UnsupportedOperationException}.
+ */
+public abstract class AbstractRuleExecutionStrategy {
+ protected int requiredLevel = 0;
+
+ /**
+ * Execute a rule corresponding to a "CONSTRUCT" query. Throws an
+ * UnsupportedOperationException if not explicitly overridden.
+ * @param rule The construct rule to apply; assumed not null.
+ * @param metadata Additional metadata to add to any inferred triples;
+ * assumed not null.
+ * @return The number of inferred triples. Higher-level forward chaining
+ * strategies may rely on the accuracy of this number.
+ * @throws ForwardChainException if execution failed.
+ */
+ public long executeConstructRule(AbstractConstructRule rule,
+ StatementMetadata metadata) throws ForwardChainException {
+ throw new UnsupportedOperationException("Rule execution strategy does not support construct rules.");
+ };
+
+ /**
+ * Execute a rule corresponding to an update query. Throws an
+ * UnsupportedOperationException if not explicitly overridden.
+ * @param rule The update rule to apply; assumed not null.
+ * @param metadata Additional metadata to add to any updated triples;
+ * assumed not null.
+ * @return The number of inferences made. Higher-level forward chaining
+ * strategies may rely on the accuracy of this number.
+ * @throws ForwardChainException if execution failed.
+ */
+ public long executeUpdateRule(AbstractUpdateRule rule,
+ StatementMetadata metadata) throws ForwardChainException {
+ throw new UnsupportedOperationException("Rule execution strategy does not support update rules.");
+ };
+
+ /**
+ * Execute a rule capable of detecting inconsistencies. Throws an
+ * UnsupportedOperationException if not explicitly overridden.
+ * @param rule The inconsistency rule to apply; assumed not null.
+ * @param metadata Additional metadata associated with inconsistencies;
+ * assumed not null.
+ * @return The number of inconsistencies found.
+ * @throws ForwardChainException if execution failed.
+ */
+ public long executeInconsistencyRule(AbstractInconsistencyRule rule,
+ StatementMetadata metadata) throws ForwardChainException {
+ throw new UnsupportedOperationException("Rule execution strategy does not perform inconsistency detection.");
+ }
+
+ /**
+ * Initialize the strategy and make any preparations for executing rules.
+ * Does nothing by default; subclasses should override if necessary.
+ * @throws ForwardChainException
+ */
+ public void initialize() throws ForwardChainException { };
+
+ /**
+ * Shut down the strategy and perform any appropriate cleanup. Does nothing
+ * by default; subclasses should override if necessary.
+ * @throws ForwardChainException
+ */
+ public void shutDown() throws ForwardChainException { }
+
+ /**
+ * Indicate that a rule need only be applied if one of the source statements
+ * is is at least this derivation level, i.e. took this many steps to derive
+ * itself. Subclasses may use this for optimization, but are not guaranteed
+ * to.
+ * @param derivationLevel Forward chaining level of statements that should
+ * be used to trigger rules. If not set, defaults to zero which should have
+ * no effect.
+ */
+ public void setRequiredLevel(int derivationLevel) {
+ this.requiredLevel = derivationLevel;
+ };
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java
new file mode 100644
index 0000000..c095122
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/MongoPipelineStrategy.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.strategy;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.api.persist.query.RyaQuery;
+import org.apache.rya.api.persist.query.RyaQueryEngine;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.rule.AbstractConstructRule;
+import org.apache.rya.forwardchain.rule.Rule;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.mongodb.MongoDBRyaDAO;
+import org.apache.rya.mongodb.StatefulMongoDBRdfConfiguration;
+import org.apache.rya.mongodb.aggregation.AggregationPipelineQueryNode;
+import org.apache.rya.mongodb.aggregation.SparqlToPipelineTransformVisitor;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriter;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterConfig;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterException;
+import org.apache.rya.mongodb.batch.MongoDbBatchWriterUtils;
+import org.apache.rya.mongodb.batch.collection.CollectionType;
+import org.apache.rya.mongodb.batch.collection.MongoCollectionType;
+import org.apache.rya.mongodb.dao.SimpleMongoDBStorageStrategy;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.bson.Document;
+import org.bson.conversions.Bson;
+import org.openrdf.query.algebra.QueryRoot;
+import org.openrdf.query.algebra.TupleExpr;
+
+import com.google.common.base.Preconditions;
+import com.mongodb.Block;
+import com.mongodb.DBObject;
+import com.mongodb.MongoClient;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.MongoDatabase;
+import com.mongodb.util.JSON;
+
+/**
+ * A rule execution strategy for MongoDB Rya that converts a single rule into an
+ * aggregation pipeline whenever possible. Falls back on an internal
+ * {@link SailExecutionStrategy} to handle any rules that can't be converted.
+ */
+public class MongoPipelineStrategy extends AbstractRuleExecutionStrategy {
+ private static final Logger logger = Logger.getLogger(MongoPipelineStrategy.class);
+
+ private static final int PIPELINE_BATCH_SIZE = 1000;
+
+ private final SparqlToPipelineTransformVisitor pipelineVisitor;
+ private final MongoCollection<Document> baseCollection;
+ private final MongoDbBatchWriter<Document> batchWriter;
+ private final MongoDBRyaDAO dao;
+ private final SimpleMongoDBStorageStrategy storageStrategy = new SimpleMongoDBStorageStrategy();
+ private final ConcurrentHashMap<Rule, Long> executionTimes = new ConcurrentHashMap<>();
+ private final AbstractRuleExecutionStrategy backup;
+ private final RyaQueryEngine<StatefulMongoDBRdfConfiguration> engine;
+ private boolean usedBackup = false;
+
+ /**
+ * Initialize based on a configuration.
+ * @param mongoConf Should contain database information; cannot be null. If
+ * passed a stateful configuration, uses the existing mongo client,
+ * otherwise creates one.
+ */
+ public MongoPipelineStrategy(MongoDBRdfConfiguration mongoConf) throws ForwardChainException {
+ Preconditions.checkNotNull(mongoConf);
+ final String mongoDBName = mongoConf.getMongoDBName();
+ final String collectionName = mongoConf.getTriplesCollectionName();
+ mongoConf.setFlush(false);
+ final StatefulMongoDBRdfConfiguration statefulConf;
+ try {
+ if (mongoConf instanceof StatefulMongoDBRdfConfiguration) {
+ statefulConf = (StatefulMongoDBRdfConfiguration) mongoConf;
+ this.dao = new MongoDBRyaDAO();
+ this.dao.setConf(statefulConf);
+ this.dao.init();
+ }
+ else {
+ this.dao = RyaSailFactory.getMongoDAO(mongoConf);
+ statefulConf = this.dao.getConf();
+ }
+ } catch (RyaDAOException e) {
+ throw new ForwardChainException("Can't connect to Rya.", e);
+ }
+ final MongoClient mongoClient = statefulConf.getMongoClient();
+ final MongoDatabase mongoDB = mongoClient.getDatabase(mongoDBName);
+ this.baseCollection = mongoDB.getCollection(collectionName);
+ this.pipelineVisitor = new SparqlToPipelineTransformVisitor(this.baseCollection);
+ this.engine = this.dao.getQueryEngine();
+ this.backup = new SailExecutionStrategy(statefulConf);
+ final MongoDbBatchWriterConfig writerConfig = MongoDbBatchWriterUtils.getMongoDbBatchWriterConfig(statefulConf);
+ final CollectionType<Document> ct = new MongoCollectionType(baseCollection);
+ this.batchWriter = new MongoDbBatchWriter<>(ct, writerConfig);
+ try {
+ this.batchWriter.start();
+ } catch (final MongoDbBatchWriterException e) {
+ throw new ForwardChainException("Error starting MongoDB batch writer", e);
+ }
+ }
+
+ /**
+ * Execute a CONSTRUCT rule by converting it into a pipeline, iterating
+ * through the resulting documents, and inserting them back to the data
+ * store as new triples. If pipeline conversion fails, falls back on
+ * default execution strategy.
+ * @param rule A construct query rule; not null.
+ * @param metadata StatementMetadata to attach to new triples; not null.
+ * @return The number of new triples inferred.
+ * @throws ForwardChainException if execution fails.
+ */
+ @Override
+ public long executeConstructRule(AbstractConstructRule rule,
+ StatementMetadata metadata) throws ForwardChainException {
+ Preconditions.checkNotNull(rule);
+ logger.info("Applying inference rule " + rule + "...");
+ long timestamp = System.currentTimeMillis();
+ // Get a pipeline that turns individual matches into triples
+ List<Bson> pipeline = null;
+ try {
+ int requireSourceLevel = 0;
+ if (!usedBackup) {
+ // If we can assume derivation levels are set properly, we can optimize by
+ // pruning any derived fact whose sources are all old information. (i.e. we can
+ // infer that the pruned fact would have already been derived in a previous
+ // step.) But if the backup strategy has ever been used, the source triples aren't
+ // guaranteed to have derivation level set.
+ requireSourceLevel = requiredLevel;
+ }
+ pipeline = toPipeline(rule, requireSourceLevel, timestamp);
+ }
+ catch (ForwardChainException e) {
+ logger.error(e);
+ }
+ if (pipeline == null) {
+ if (backup == null) {
+ logger.error("Couldn't convert " + rule + " to pipeline:");
+ for (String line : rule.getQuery().toString().split("\n")) {
+ logger.error("\t" + line);
+ }
+ throw new UnsupportedOperationException("Couldn't convert query to pipeline.");
+ }
+ else {
+ logger.debug("Couldn't convert " + rule + " to pipeline:");
+ for (String line : rule.getQuery().toString().split("\n")) {
+ logger.debug("\t" + line);
+ }
+ logger.debug("Using fallback strategy.");
+ usedBackup = true;
+ return backup.executeConstructRule(rule, metadata);
+ }
+ }
+ // Execute the pipeline
+ for (Bson step : pipeline) {
+ logger.debug("\t" + step.toString());
+ }
+ LongAdder count = new LongAdder();
+ baseCollection.aggregate(pipeline)
+ .allowDiskUse(true)
+ .batchSize(PIPELINE_BATCH_SIZE)
+ .forEach(new Block<Document>() {
+ @Override
+ public void apply(Document doc) {
+ final DBObject dbo = (DBObject) JSON.parse(doc.toJson());
+ RyaStatement rstmt = storageStrategy.deserializeDBObject(dbo);
+ if (!statementExists(rstmt)) {
+ count.increment();
+ doc.replace(SimpleMongoDBStorageStrategy.STATEMENT_METADATA, metadata.toString());
+ try {
+ batchWriter.addObjectToQueue(doc);
+ } catch (MongoDbBatchWriterException e) {
+ logger.error("Couldn't insert " + rstmt, e);
+ }
+ }
+ }
+ });
+ try {
+ batchWriter.flush();
+ } catch (MongoDbBatchWriterException e) {
+ throw new ForwardChainException("Error writing to Mongo", e);
+ }
+ logger.info("Added " + count + " new statements.");
+ executionTimes.compute(rule, (r, previous) -> {
+ if (previous != null && previous > timestamp) {
+ return previous;
+ }
+ else {
+ return timestamp;
+ }
+ });
+ return count.longValue();
+ }
+
+ private boolean statementExists(RyaStatement rstmt) {
+ try {
+ return engine.query(new RyaQuery(rstmt)).iterator().hasNext();
+ } catch (RyaDAOException e) {
+ logger.error("Error querying for " + rstmt, e);
+ return false;
+ }
+ }
+
+ /**
+ * Flush and close the batch writer, and shut down the backup
+ * SailExecutionStrategy.
+ * @throws ForwardChainException if the batch writer or backup strategy
+ * throw any errors.
+ */
+ @Override
+ public void shutDown() throws ForwardChainException {
+ backup.shutDown();
+ try {
+ batchWriter.shutdown();
+ } catch (MongoDbBatchWriterException e) {
+ throw new ForwardChainException("Error shutting down batch writer", e);
+ }
+ }
+
+ /**
+ * Converts a construct rule into a series of documents representing
+ * aggregation pipeline steps.
+ * @param rule A construct query rule.
+ * @param sourceLevel Only make derivations whose source triples have this
+ * derivation level or higher, i.e. took some number of forward chaining
+ * steps to infer. Set to zero to skip this check.
+ * @param timestamp Timestamp to be set for all inferred triples.
+ * @return An aggregation pipeline.
+ * @throws ForwardChainException if pipeline construction fails.
+ */
+ private List<Bson> toPipeline(AbstractConstructRule rule, int sourceLevel,
+ long timestamp) throws ForwardChainException {
+ TupleExpr tupleExpr = rule.getQuery().getTupleExpr();
+ if (!(tupleExpr instanceof QueryRoot)) {
+ tupleExpr = new QueryRoot(tupleExpr);
+ }
+ try {
+ tupleExpr.visit(pipelineVisitor);
+ } catch (Exception e) {
+ throw new ForwardChainException("Error converting construct rule to an aggregation pipeline", e);
+ }
+ if (tupleExpr instanceof QueryRoot) {
+ QueryRoot root = (QueryRoot) tupleExpr;
+ if (root.getArg() instanceof AggregationPipelineQueryNode) {
+ AggregationPipelineQueryNode pipelineNode = (AggregationPipelineQueryNode) root.getArg();
+ pipelineNode.distinct(); // require distinct triples
+ pipelineNode.requireSourceDerivationDepth(sourceLevel);
+ long latestTime = executionTimes.getOrDefault(rule, 0L);
+ if (latestTime > 0) {
+ pipelineNode.requireSourceTimestamp(latestTime);
+ }
+ return pipelineNode.getTriplePipeline(timestamp, false);
+ }
+ }
+ return null;
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/RoundRobinStrategy.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/RoundRobinStrategy.java
new file mode 100644
index 0000000..eb044fc
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/RoundRobinStrategy.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.strategy;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.log4j.Logger;
+import org.apache.rya.api.domain.RyaType;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.forwardchain.ForwardChainConstants;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.rule.Rule;
+import org.apache.rya.forwardchain.rule.Ruleset;
+import org.openrdf.model.vocabulary.XMLSchema;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A simple {@link AbstractForwardChainStrategy} that iterates over every
+ * relevant rule once, and repeats until no rules are relevant.
+ * <p>
+ * Initially, all rules are considered relevant. Iteration 1 executes each rule
+ * once.
+ * <p>
+ * When a rule produces inferences, all rules that are marked as that rule's
+ * successors according to the {@link Ruleset} are triggered as potentially
+ * relevant for future execution. If a triggered rule is scheduled to be
+ * executed during the current iteration, nothing changes. If a triggered rule
+ * has already been executed once during the current iteration, or was not
+ * activated for the current iteration at all, it is flagged to be executed
+ * during the next iteration.
+ * <p>
+ * When an iteration concludes, a new iteration begins with the relevant set of
+ * rules having been determined during the previous iteration. If there are no
+ * such rules, forward chaining ends.
+ * <p>
+ * Within each iteration, rules are processed such that a rule which may trigger
+ * many other rules is given priority over a rule that may be triggered by many
+ * other rules.
+ * <p>
+ * The observation that one rule may trigger another is based on the
+ * relationships between triple patterns produced and consumed by the rules in
+ * general, not based on any triples that were actually generated. Therefore,
+ * there may be false positives but not false negatives: Rules triggered by the
+ * current rule may or may not produce more triples in response, but any rule
+ * that could produce triples in response will be triggered.
+ * <p>
+ * The procedure for executing the individual rules is governed by the
+ * {@link RuleExecutionStrategy}. This class uses the strategy's reported counts
+ * to determine whether or not a rule has produced inferences.
+ */
+public class RoundRobinStrategy extends AbstractForwardChainStrategy {
+ private static final Logger logger = Logger.getLogger(RoundRobinStrategy.class);
+
+ private final AbstractRuleExecutionStrategy ruleStrategy;
+ private int iteration;
+ private Ruleset ruleset;
+ private Set<Rule> activeNow;
+ private Set<Rule> activeNextIteration;
+ private long inferencesThisIteration;
+ private AtomicBoolean initialized = new AtomicBoolean(false);
+
+ /**
+ * Instantiate a RoundRobinStrategy by providing the RuleExecutionStrategy.
+ * @param ruleStrategy Defines how to execute individual rules; not null.
+ */
+ public RoundRobinStrategy(AbstractRuleExecutionStrategy ruleStrategy) {
+ Preconditions.checkNotNull(ruleStrategy);
+ this.ruleStrategy = ruleStrategy;
+ }
+
+ @Override
+ public void initialize(Ruleset withRuleset) throws ForwardChainException {
+ Preconditions.checkNotNull(withRuleset);
+ iteration = 0;
+ ruleset = withRuleset;
+ activeNow = new HashSet<>();
+ activeNextIteration = new HashSet<>(ruleset.getRules());
+ logger.info("Initializing round robin forward chaining, with " +
+ activeNextIteration.size() + " rules.");
+ initialized.set(true);
+ prepareQueue();
+ }
+
+ private void prepareQueue() throws ForwardChainException {
+ if (initialized.get()) {
+ if (activeNow.isEmpty()) {
+ if (iteration > 0) {
+ logger.info("Finished iteration " + iteration + "; made " +
+ inferencesThisIteration + " inferences.");
+ }
+ if (activeNextIteration.isEmpty()) {
+ logger.info("Finished forward chaining after " + iteration + " iterations.");
+ setDone();
+ }
+ else {
+ ruleStrategy.setRequiredLevel(iteration);
+ iteration++;
+ inferencesThisIteration = 0;
+ activeNow.addAll(activeNextIteration);
+ activeNextIteration.clear();
+ logger.info("Beginning iteration " + iteration + ", with " +
+ activeNow.size() + " rules to execute...");
+ }
+ }
+ }
+ }
+
+ private void setDone() throws ForwardChainException {
+ initialized.set(false);
+ if (ruleStrategy != null) {
+ ruleStrategy.shutDown();
+ }
+ }
+
+ @Override
+ public boolean isActive() {
+ return initialized.get();
+ }
+
+ @Override
+ public long executeNext() throws ForwardChainException {
+ if (!initialized.get()) {
+ return 0;
+ }
+ Rule rule = getNextRule();
+ if (rule == null) {
+ return 0;
+ }
+ StatementMetadata metadata = new StatementMetadata();
+ metadata.addMetadata(ForwardChainConstants.RYA_DERIVATION_TIME,
+ new RyaType(XMLSchema.INT, Integer.toString(iteration)));
+ long inferences = rule.execute(ruleStrategy, metadata);
+ inferencesThisIteration += inferences;
+ if (inferences > 0) {
+ for (Rule successor : ruleset.getSuccessorsOf(rule)) {
+ // If we'll handle the triggered rule in the current iteration,
+ // it may not need to be checked in the next one.
+ if (!activeNow.contains(successor)) {
+ activeNextIteration.add(successor);
+ }
+ }
+ }
+ prepareQueue();
+ return inferences;
+ }
+
+ private Rule getNextRule() {
+ if (activeNow.isEmpty()) {
+ return null;
+ }
+ Ruleset subset = new Ruleset(activeNow);
+ SortedSet<Rule> sorted = new TreeSet<>(new Comparator<Rule>() {
+ @Override
+ public int compare(Rule r1, Rule r2) {
+ // If one rule triggers the other (directly or indirectly) but
+ // not the other way around, the one that triggers the other
+ // should come first.
+ boolean forwardPath = subset.pathExists(r1, r2);
+ boolean backwardPath = subset.pathExists(r2, r1);
+ if (forwardPath && !backwardPath) {
+ return -1;
+ }
+ if (backwardPath && !forwardPath) {
+ return 1;
+ }
+ return 0;
+ }
+ }.thenComparingInt(rule -> {
+ // Otherwise, prioritize rules that trigger many remaining rules,
+ // and defer rules that can be triggered by many remaining rules.
+ return remainingPredecessors(rule).size() - remainingSuccessors(rule).size();
+ }).thenComparing(Rule::toString)); // Fall back on string comparison
+ sorted.addAll(activeNow);
+ Rule next = sorted.first();
+ activeNow.remove(next);
+ return next;
+ }
+
+ private Set<Rule> remainingSuccessors(Rule rule) {
+ Set<Rule> successors = new HashSet<>(ruleset.getSuccessorsOf(rule));
+ successors.retainAll(activeNow);
+ return successors;
+ }
+
+ private Set<Rule> remainingPredecessors(Rule rule) {
+ Set<Rule> predecessors = new HashSet<>(ruleset.getPredecessorsOf(rule));
+ predecessors.retainAll(activeNow);
+ return predecessors;
+ }
+}
diff --git a/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/SailExecutionStrategy.java b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/SailExecutionStrategy.java
new file mode 100644
index 0000000..d09c50c
--- /dev/null
+++ b/extras/rya.forwardchain/src/main/java/org/apache/rya/forwardchain/strategy/SailExecutionStrategy.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.strategy;
+
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
+import org.apache.log4j.Logger;
+import org.apache.rya.accumulo.AccumuloRdfConfiguration;
+import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
+import org.apache.rya.api.domain.RyaStatement;
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.api.persist.RyaDAO;
+import org.apache.rya.api.persist.RyaDAOException;
+import org.apache.rya.api.persist.query.RyaQuery;
+import org.apache.rya.api.persist.query.RyaQueryEngine;
+import org.apache.rya.api.resolver.RdfToRyaConversions;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.rule.AbstractConstructRule;
+import org.apache.rya.indexing.accumulo.ConfigUtils;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.calrissian.mango.collect.CloseableIterable;
+import org.openrdf.model.Statement;
+import org.openrdf.query.GraphQuery;
+import org.openrdf.query.QueryEvaluationException;
+import org.openrdf.query.parser.ParsedGraphQuery;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailGraphQuery;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.rio.RDFHandlerException;
+import org.openrdf.rio.helpers.RDFHandlerBase;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A naive but back-end-agnostic rule execution strategy that applies a
+ * construct rule by submitting the associated query to a Rya SAIL, then
+ * converting the resulting bindings (expecting variables "subject",
+ * "predicate", and "object") into triples and inserting them into a Rya DAO.
+ */
+public class SailExecutionStrategy extends AbstractRuleExecutionStrategy {
+ private static final Logger logger = Logger.getLogger(SailExecutionStrategy.class);
+
+ private final RdfCloudTripleStoreConfiguration conf;
+
+ private SailRepository repo = null;
+ private SailRepositoryConnection conn = null;
+ private RyaDAO<?> dao = null;
+ private boolean initialized = false;
+
+ /**
+ * Initialize a SailExecutionStrategy with the given configuration.
+ * @param conf Defines Rya connection and query parameters; not null.
+ */
+ public SailExecutionStrategy(RdfCloudTripleStoreConfiguration conf) {
+ Preconditions.checkNotNull(conf);
+ this.conf = conf;
+ }
+
+ /**
+ * Executes a CONSTRUCT query through the SAIL and inserts the results into
+ * the DAO.
+ * @param rule A construct query; not null.
+ * @param metadata Metadata to add to any inferred triples; not null.
+ * @return The number of inferred triples.
+ * @throws ForwardChainException if query execution or data insert fails.
+ */
+ @Override
+ public long executeConstructRule(AbstractConstructRule rule,
+ StatementMetadata metadata) throws ForwardChainException {
+ Preconditions.checkNotNull(rule);
+ Preconditions.checkNotNull(metadata);
+ if (!initialized) {
+ initialize();
+ }
+ ParsedGraphQuery graphQuery = rule.getQuery();
+ long statementsAdded = 0;
+ logger.info("Applying inference rule " + rule + "...");
+ for (String line : graphQuery.getTupleExpr().toString().split("\n")) {
+ logger.debug("\t" + line);
+ }
+ InferredStatementHandler<?> handler = new InferredStatementHandler<>(dao, metadata);
+ try {
+ GraphQuery executableQuery = new SailGraphQuery(graphQuery, conn) { };
+ executableQuery.evaluate(handler);
+ statementsAdded = handler.getNumStatementsAdded();
+ logger.info("Added " + statementsAdded + " inferred statements.");
+ return statementsAdded;
+ } catch (QueryEvaluationException e) {
+ throw new ForwardChainException("Error evaluating query portion of construct rule", e);
+ } catch (RDFHandlerException e) {
+ throw new ForwardChainException("Error processing results of construct rule", e);
+ }
+ }
+
+ /**
+ * Connect to the Rya SAIL. If a DAO wasn't provided, instantiate one from
+ * the configuration.
+ * @throws ForwardChainException if connecting fails.
+ */
+ @Override
+ public void initialize() throws ForwardChainException {
+ try {
+ if (dao == null) {
+ dao = getDAO();
+ }
+ repo = new SailRepository(RyaSailFactory.getInstance(conf));
+ conn = repo.getConnection();
+ initialized = true;
+ } catch (Exception e) {
+ shutDown();
+ throw new ForwardChainException("Error connecting to SAIL", e);
+ }
+ }
+
+ private RyaDAO<?> getDAO() throws RyaDAOException, ForwardChainException {
+ if (ConfigUtils.getUseMongo(conf)) {
+ MongoDBRdfConfiguration mongoConf;
+ if (conf instanceof MongoDBRdfConfiguration) {
+ mongoConf = (MongoDBRdfConfiguration) conf;
+ }
+ else {
+ mongoConf = new MongoDBRdfConfiguration(conf);
+ }
+ return RyaSailFactory.getMongoDAO(mongoConf);
+ }
+ else {
+ AccumuloRdfConfiguration accumuloConf;
+ if (conf instanceof AccumuloRdfConfiguration) {
+ accumuloConf = (AccumuloRdfConfiguration) conf;
+ }
+ else {
+ accumuloConf = new AccumuloRdfConfiguration(conf);
+ }
+ try {
+ return RyaSailFactory.getAccumuloDAO(accumuloConf);
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ throw new ForwardChainException(e);
+ }
+ }
+ }
+
+ /**
+ * Shut down the SAIL connection objects.
+ */
+ @Override
+ public void shutDown() {
+ initialized = false;
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (RepositoryException e) {
+ logger.warn("Error closing SailRepositoryConnection", e);
+ }
+ }
+ if (repo != null && repo.isInitialized()) {
+ try {
+ repo.shutDown();
+ } catch (RepositoryException e) {
+ logger.warn("Error shutting down SailRepository", e);
+ }
+ }
+ try {
+ if (dao != null && dao.isInitialized()) {
+ dao.flush();
+ }
+ } catch (RyaDAOException e) {
+ logger.warn("Error flushing DAO", e);
+ }
+ }
+
+ private static class InferredStatementHandler<T extends RdfCloudTripleStoreConfiguration> extends RDFHandlerBase {
+ private RyaDAO<T> dao;
+ private RyaQueryEngine<T> engine;
+ private long numStatementsAdded = 0;
+ private StatementMetadata metadata;
+
+ InferredStatementHandler(RyaDAO<T> dao, StatementMetadata metadata) {
+ this.dao = dao;
+ this.engine = dao.getQueryEngine();
+ this.metadata = metadata;
+ this.engine.setConf(dao.getConf());
+ }
+
+ @Override
+ public void handleStatement(Statement statement) {
+ RyaStatement ryaStatement = RdfToRyaConversions.convertStatement(statement);
+ ryaStatement.setStatementMetadata(metadata);
+ try {
+ // Need to check whether the statement already exists, because
+ // we need an accurate count of newly added statements.
+ CloseableIterable<RyaStatement> iter = engine.query(new RyaQuery(ryaStatement));
+ if (!iter.iterator().hasNext()) {
+ dao.add(ryaStatement);
+ numStatementsAdded++;
+ }
+ } catch (RyaDAOException e) {
+ logger.error("Error handling inferred statement", e);
+ }
+ }
+
+ public long getNumStatementsAdded() {
+ return numStatementsAdded;
+ }
+ }
+}
diff --git a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java
new file mode 100644
index 0000000..c70a025
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/batch/MongoSpinIT.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.batch;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration;
+import org.apache.rya.indexing.mongodb.MongoIndexingConfiguration.MongoDBIndexingConfigBuilder;
+import org.apache.rya.mongodb.EmbeddedMongoFactory;
+import org.apache.rya.mongodb.MongoDBRdfConfiguration;
+import org.apache.rya.sail.config.RyaSailFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.query.BindingSet;
+import org.openrdf.query.QueryLanguage;
+import org.openrdf.query.TupleQuery;
+import org.openrdf.query.TupleQueryResult;
+import org.openrdf.query.impl.ListBindingSet;
+import org.openrdf.repository.RepositoryException;
+import org.openrdf.repository.sail.SailRepository;
+import org.openrdf.repository.sail.SailRepositoryConnection;
+import org.openrdf.rio.RDFFormat;
+import org.openrdf.rio.Rio;
+
+import com.google.common.io.Resources;
+import com.mongodb.MongoClient;
+import com.mongodb.ServerAddress;
+
+public class MongoSpinIT {
+ private static final ValueFactory VF = ValueFactoryImpl.getInstance();
+ private static final String EX = "http://example.org/";
+
+ private MongoDBRdfConfiguration conf;
+ private SailRepository repository;
+
+ @Before
+ public void setup() throws Exception {
+ Logger.getLogger("org.apache.rya.mongodb").setLevel(Level.WARN);
+ Logger.getLogger("org.apache.rya.forwardchain").setLevel(Level.INFO);
+ conf = getConf();
+ repository = new SailRepository(RyaSailFactory.getInstance(conf));
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (repository != null) {
+ try {
+ repository.shutDown();
+ } catch (final RepositoryException e) {
+ // quietly absorb this exception
+ }
+ }
+ }
+
+ @Test
+ public void testSailStrategy() throws Exception {
+ insertDataFile(Resources.getResource("data.ttl"), "http://example.org#");
+ insertDataFile(Resources.getResource("university.ttl"), "http://example.org#");
+ insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#");
+ Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
+ Set<BindingSet> expected = new HashSet<>();
+ Assert.assertEquals(expected, solutions);
+ conf.setUseAggregationPipeline(false);
+ ForwardChainSpinTool tool = new ForwardChainSpinTool();
+ ToolRunner.run(conf, tool, new String[] {});
+ solutions = executeQuery(Resources.getResource("query.sparql"));
+ expected.add(new ListBindingSet(Arrays.asList("X", "Y"),
+ VF.createURI(EX, "Alice"), VF.createURI(EX, "Department1")));
+ Assert.assertEquals(expected, solutions);
+ Assert.assertEquals(24, tool.getNumInferences());
+ }
+
+ @Test
+ public void testPipelineStrategy() throws Exception {
+ insertDataFile(Resources.getResource("data.ttl"), "http://example.org#");
+ insertDataFile(Resources.getResource("university.ttl"), "http://example.org#");
+ insertDataFile(Resources.getResource("owlrl.ttl"), "http://example.org#");
+ Set<BindingSet> solutions = executeQuery(Resources.getResource("query.sparql"));
+ Set<BindingSet> expected = new HashSet<>();
+ Assert.assertEquals(expected, solutions);
+ conf.setUseAggregationPipeline(true);
+ ForwardChainSpinTool tool = new ForwardChainSpinTool();
+ ToolRunner.run(conf, tool, new String[] {});
+ solutions = executeQuery(Resources.getResource("query.sparql"));
+ expected.add(new ListBindingSet(Arrays.asList("X", "Y"),
+ VF.createURI(EX, "Alice"), VF.createURI(EX, "Department1")));
+ Assert.assertEquals(expected, solutions);
+ Assert.assertEquals(24, tool.getNumInferences());
+ }
+
+ private void insertDataFile(URL dataFile, String defaultNamespace) throws Exception {
+ RDFFormat format = Rio.getParserFormatForFileName(dataFile.getFile());
+ SailRepositoryConnection conn = repository.getConnection();
+ try {
+ conn.add(dataFile, defaultNamespace, format);
+ } finally {
+ closeQuietly(conn);
+ }
+ }
+
+ Set<BindingSet> executeQuery(URL queryFile) throws Exception {
+ SailRepositoryConnection conn = repository.getConnection();
+ try {
+ InputStream queryIS = queryFile.openStream();
+ BufferedReader br = new BufferedReader(new java.io.InputStreamReader(queryIS, "UTF-8"));
+ String query = br.lines().collect(Collectors.joining("\n"));
+ br.close();
+ TupleQuery tupleQuery = conn.prepareTupleQuery(QueryLanguage.SPARQL, query);
+ TupleQueryResult result = tupleQuery.evaluate();
+ Set<BindingSet> solutions = new HashSet<>();
+ while (result.hasNext()) {
+ solutions.add(result.next());
+ }
+ return solutions;
+ } finally {
+ closeQuietly(conn);
+ }
+ }
+
+ private static MongoDBRdfConfiguration getConf() throws Exception {
+ MongoDBIndexingConfigBuilder builder = MongoIndexingConfiguration.builder().setUseMockMongo(true);
+ final MongoClient c = EmbeddedMongoFactory.newFactory().newMongoClient();
+ final ServerAddress address = c.getAddress();
+ builder.setMongoHost(address.getHost());
+ builder.setMongoPort(Integer.toString(address.getPort()));
+ builder.setUseInference(false);
+ c.close();
+ return builder.build();
+ }
+
+ private static void closeQuietly(final SailRepositoryConnection conn) {
+ if (conn != null) {
+ try {
+ conn.close();
+ } catch (final RepositoryException e) {
+ // quietly absorb this exception
+ }
+ }
+ }
+}
diff --git a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/AntecedentVisitorTest.java b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/AntecedentVisitorTest.java
new file mode 100644
index 0000000..7761a1a
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/AntecedentVisitorTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.FOAF;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.algebra.StatementPattern.Scope;
+import org.openrdf.query.parser.ParsedQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.collect.Sets;
+
+public class AntecedentVisitorTest {
+ private static Var c(Value val) {
+ Var v = new Var("-const-" + val.stringValue(), val);
+ v.setAnonymous(true);
+ return v;
+ }
+
+ private static ValueFactory VF = ValueFactoryImpl.getInstance();
+ private static String EX = "http://example.org/";
+ private static URI G1 = VF.createURI(EX, "Graph1");
+ private static URI G2 = VF.createURI(EX, "Graph2");
+
+ @Test
+ public void testSelectQuery() throws Exception {
+ String text = "PREFIX foaf: <" + FOAF.NAMESPACE + ">\n"
+ + "SELECT * WHERE {\n"
+ + " ?x a foaf:Person .\n"
+ + " ?y a foaf:Person .\n"
+ + " ?x foaf:knows ?y .\n"
+ + "}";
+ ParsedQuery query = new SPARQLParser().parseQuery(text, null);
+ AntecedentVisitor visitor = new AntecedentVisitor();
+ query.getTupleExpr().visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(new Var("x"), c(RDF.TYPE), c(FOAF.PERSON)),
+ new StatementPattern(new Var("y"), c(RDF.TYPE), c(FOAF.PERSON)),
+ new StatementPattern(new Var("x"), c(FOAF.KNOWS), new Var("y")));
+ Assert.assertEquals(expected, visitor.getAntecedents());
+ }
+
+ @Test
+ public void testConstructQuery() throws Exception {
+ String text = "PREFIX foaf: <" + FOAF.NAMESPACE + ">\n"
+ + "CONSTRUCT {\n"
+ + " ?y foaf:knows ?x .\n"
+ + " ?y <urn:knows> ?x .\n"
+ + " ?x <urn:knows> ?y .\n"
+ + "} WHERE {\n"
+ + " ?x a foaf:Person .\n"
+ + " ?y a foaf:Person .\n"
+ + " ?x foaf:knows ?y .\n"
+ + "}";
+ ParsedQuery query = new SPARQLParser().parseQuery(text, null);
+ AntecedentVisitor visitor = new AntecedentVisitor();
+ query.getTupleExpr().visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(new Var("x"), c(RDF.TYPE), c(FOAF.PERSON)),
+ new StatementPattern(new Var("y"), c(RDF.TYPE), c(FOAF.PERSON)),
+ new StatementPattern(new Var("x"), c(FOAF.KNOWS), new Var("y")));
+ Assert.assertEquals(expected, visitor.getAntecedents());
+ }
+
+ @Test
+ public void testComplexQuery() throws Exception {
+ String text = "PREFIX foaf: <" + FOAF.NAMESPACE + ">\n"
+ + "PREFIX ex: <" + EX + ">\n"
+ + "SELECT * WHERE {\n"
+ + " { ?x a foaf:Person } UNION {\n"
+ + " GRAPH ex:Graph1 { ?y a foaf:Person }\n"
+ + " } .\n"
+ + " GRAPH ex:Graph2 {\n"
+ + " ?x foaf:knows ?y .\n"
+ + " }\n ."
+ + " OPTIONAL { ?x foaf:mbox ?m } .\n"
+ + " FILTER (?x != ?y) .\n"
+ + "}";
+ ParsedQuery query = new SPARQLParser().parseQuery(text, null);
+ AntecedentVisitor visitor = new AntecedentVisitor();
+ query.getTupleExpr().visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(Scope.NAMED_CONTEXTS, new Var("y"), c(RDF.TYPE), c(FOAF.PERSON), c(G1)),
+ new StatementPattern(new Var("x"), c(RDF.TYPE), c(FOAF.PERSON)),
+ new StatementPattern(Scope.NAMED_CONTEXTS, new Var("x"), c(FOAF.KNOWS), new Var("y"), c(G2)),
+ new StatementPattern(new Var("x"), c(FOAF.MBOX), new Var("m")));
+ Assert.assertEquals(expected, visitor.getAntecedents());
+ }
+
+ @Test
+ public void testBNodeQuery() throws Exception {
+ String text = "PREFIX foaf: <" + FOAF.NAMESPACE + ">\n"
+ + "SELECT * WHERE {\n"
+ + " ?x a [ rdfs:subClassOf foaf:Person ] .\n"
+ + " ?x foaf:knows ?y .\n"
+ + "}";
+ ParsedQuery query = new SPARQLParser().parseQuery(text, null);
+ AntecedentVisitor visitor = new AntecedentVisitor();
+ query.getTupleExpr().visit(visitor);
+ Set<StatementPattern> actual = visitor.getAntecedents();
+ Assert.assertEquals(3, actual.size());
+ StatementPattern knows = new StatementPattern(new Var("x"), c(FOAF.KNOWS), new Var("y"));
+ Assert.assertTrue(actual.remove(knows));
+ Assert.assertTrue(actual.removeIf(sp -> {
+ return sp.getSubjectVar().equals(new Var("x"))
+ && RDF.TYPE.equals(sp.getPredicateVar().getValue())
+ && sp.getObjectVar().getValue() == null;
+ }));
+ Assert.assertTrue(actual.removeIf(sp -> {
+ return sp.getSubjectVar().getValue() == null
+ && RDFS.SUBCLASSOF.equals(sp.getPredicateVar().getValue())
+ && FOAF.PERSON.equals(sp.getObjectVar().getValue());
+ }));
+ }
+
+ @Test
+ public void testNoSP() throws Exception {
+ String text = "CONSTRUCT {\n"
+ + " owl:Thing a owl:Class ."
+ + " owl:Nothing a owl:Class ."
+ + " owl:Nothing rdfs:subClassOf owl:Thing ."
+ + "} WHERE { }";
+ ParsedQuery query = new SPARQLParser().parseQuery(text, null);
+ AntecedentVisitor visitor = new AntecedentVisitor();
+ query.getTupleExpr().visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet();
+ Assert.assertEquals(expected, visitor.getAntecedents());
+ }
+}
diff --git a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/ConstructConsequentVisitorTest.java b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/ConstructConsequentVisitorTest.java
new file mode 100644
index 0000000..0865ef8
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/ConstructConsequentVisitorTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Arrays;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.FOAF;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.query.algebra.BNodeGenerator;
+import org.openrdf.query.algebra.Extension;
+import org.openrdf.query.algebra.ExtensionElem;
+import org.openrdf.query.algebra.MultiProjection;
+import org.openrdf.query.algebra.Projection;
+import org.openrdf.query.algebra.ProjectionElem;
+import org.openrdf.query.algebra.ProjectionElemList;
+import org.openrdf.query.algebra.SingletonSet;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.ValueConstant;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.collect.Sets;
+
+public class ConstructConsequentVisitorTest {
+ private static Var s(Value val) {
+ return new Var("subject", val);
+ }
+ private static Var p(Value val) {
+ return new Var("predicate", val);
+ }
+ private static Var o(Value val) {
+ return new Var("object", val);
+ }
+ private static Var anon(Var var) {
+ var.setAnonymous(true);
+ return var;
+ }
+
+ @Test
+ public void testGenericSP() {
+ Extension extension = new Extension(new SingletonSet(),
+ new ExtensionElem(new Var("z"), "z"));
+ Projection projection = new Projection(extension, new ProjectionElemList(
+ new ProjectionElem("x", "subject"),
+ new ProjectionElem("y", "predicate"),
+ new ProjectionElem("z", "object")));
+ ConstructConsequentVisitor visitor = new ConstructConsequentVisitor();
+ projection.visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(s(null), p(null), o(null)));
+ Assert.assertEquals(expected, visitor.getConsequents());
+ }
+
+ @Test
+ public void testConcreteSP() {
+ Extension extension = new Extension(new SingletonSet(),
+ new ExtensionElem(new ValueConstant(FOAF.PERSON), "x"),
+ new ExtensionElem(new ValueConstant(RDF.TYPE), "y"),
+ new ExtensionElem(new ValueConstant(OWL.CLASS), "z"));
+ Projection projection = new Projection(extension, new ProjectionElemList(
+ new ProjectionElem("x", "subject"),
+ new ProjectionElem("y", "predicate"),
+ new ProjectionElem("z", "object")));
+ ConstructConsequentVisitor visitor = new ConstructConsequentVisitor();
+ projection.visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(s(FOAF.PERSON), p(RDF.TYPE), o(OWL.CLASS)));
+ Assert.assertEquals(expected, visitor.getConsequents());
+ }
+
+ @Test
+ public void testMissingVariables() {
+ Extension extension = new Extension(new SingletonSet(),
+ new ExtensionElem(new ValueConstant(FOAF.PERSON), "x"),
+ new ExtensionElem(new ValueConstant(RDF.TYPE), "y"));
+ Projection projection = new Projection(extension, new ProjectionElemList(
+ new ProjectionElem("x", "s"),
+ new ProjectionElem("y", "predicate"),
+ new ProjectionElem("z", "object")));
+ ConstructConsequentVisitor visitor = new ConstructConsequentVisitor();
+ projection.visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(s(null), p(RDF.TYPE), o(null)));
+ Assert.assertEquals(expected, visitor.getConsequents());
+ }
+
+ @Test
+ public void testMultiProjection() {
+ Extension extension = new Extension(new SingletonSet(),
+ new ExtensionElem(new ValueConstant(RDF.TYPE), "rdftype"),
+ new ExtensionElem(new ValueConstant(OWL.OBJECTPROPERTY), "owlprop"),
+ new ExtensionElem(new ValueConstant(OWL.EQUIVALENTCLASS), "owleqcls"),
+ new ExtensionElem(new ValueConstant(OWL.CLASS), "owlclass"));
+ MultiProjection projection = new MultiProjection(extension, Arrays.asList(
+ new ProjectionElemList(
+ new ProjectionElem("cls", "subject"),
+ new ProjectionElem("rdftype", "predicate"),
+ new ProjectionElem("owlclass", "object")),
+ new ProjectionElemList(
+ new ProjectionElem("prop", "subject"),
+ new ProjectionElem("rdftype", "predicate"),
+ new ProjectionElem("owlprop", "object")),
+ new ProjectionElemList(
+ new ProjectionElem("owleqcls", "predicate"),
+ new ProjectionElem("cls", "object"))));
+ ConstructConsequentVisitor visitor = new ConstructConsequentVisitor();
+ projection.visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(s(null), p(RDF.TYPE), o(OWL.CLASS)),
+ new StatementPattern(s(null), p(RDF.TYPE), o(OWL.OBJECTPROPERTY)),
+ new StatementPattern(s(null), p(OWL.EQUIVALENTCLASS), o(null)));
+ Assert.assertEquals(expected, visitor.getConsequents());
+ }
+
+ @Test
+ public void testNoExtension() {
+ StatementPattern sp = new StatementPattern(new Var("x"), new Var("y"), new Var("z"));
+ Projection projection = new Projection(sp, new ProjectionElemList(
+ new ProjectionElem("x", "subject"),
+ new ProjectionElem("y", "predicate"),
+ new ProjectionElem("z", "object")));
+ ConstructConsequentVisitor visitor = new ConstructConsequentVisitor();
+ projection.visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(s(null), p(null), o(null)));
+ Assert.assertEquals(expected, visitor.getConsequents());
+ }
+
+ @Test
+ public void testBNode() {
+ Extension extension = new Extension(new SingletonSet(),
+ new ExtensionElem(new Var("x"), "x"),
+ new ExtensionElem(new BNodeGenerator(), "z"));
+ Projection projection = new Projection(extension, new ProjectionElemList(
+ new ProjectionElem("x", "subject"),
+ new ProjectionElem("y", "predicate"),
+ new ProjectionElem("z", "object")));
+ ConstructConsequentVisitor visitor = new ConstructConsequentVisitor();
+ projection.visit(visitor);
+ Set<StatementPattern> expected = Sets.newHashSet(
+ new StatementPattern(s(null), p(null), anon(o(null))));
+ Assert.assertEquals(expected, visitor.getConsequents());
+ }
+}
diff --git a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/RulesetTest.java b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/RulesetTest.java
new file mode 100644
index 0000000..adb851b
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/RulesetTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.rya.api.domain.StatementMetadata;
+import org.apache.rya.forwardchain.ForwardChainException;
+import org.apache.rya.forwardchain.strategy.AbstractRuleExecutionStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.Value;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+
+import com.google.common.collect.Sets;
+
+public class RulesetTest {
+ private static Var c(Value val) {
+ Var v = new Var("-const-" + val.stringValue(), val);
+ v.setAnonymous(true);
+ return v;
+ }
+
+ private static class TestRule implements Rule {
+ private final Collection<StatementPattern> consume;
+ private final Collection<StatementPattern> produce;
+ TestRule(Collection<StatementPattern> consume, Collection<StatementPattern> produce) {
+ this.consume = consume;
+ this.produce = produce;
+ }
+ @Override
+ public boolean canConclude(StatementPattern sp) {
+ return produce.contains(sp);
+ }
+ @Override
+ public Collection<StatementPattern> getAntecedentPatterns() {
+ return consume;
+ }
+ @Override
+ public Collection<StatementPattern> getConsequentPatterns() {
+ return produce;
+ }
+ @Override
+ public long execute(AbstractRuleExecutionStrategy strategy,
+ StatementMetadata metadata) throws ForwardChainException {
+ return 0;
+ }
+ }
+
+ @Test
+ public void testDependencies() {
+ StatementPattern genericSP = new StatementPattern(new Var("a"), new Var("b"), new Var("c"));
+ StatementPattern typeSP = new StatementPattern(new Var("x"), c(RDF.TYPE), new Var("t"));
+ StatementPattern scoSP = new StatementPattern(new Var("x"), c(RDFS.SUBCLASSOF), new Var("y"));
+ Rule typeTriggersAny = new TestRule(
+ Sets.newHashSet(typeSP),
+ Sets.newHashSet(genericSP, typeSP, scoSP));
+ Rule subclassTriggersType = new TestRule(
+ Sets.newHashSet(scoSP),
+ Sets.newHashSet(genericSP, typeSP));
+ Rule anyTriggersNothing = new TestRule(
+ Sets.newHashSet(genericSP),
+ Sets.newHashSet());
+ Set<Rule> allRules = Sets.newHashSet(anyTriggersNothing, subclassTriggersType, typeTriggersAny);
+ Set<Rule> noRules = Sets.newHashSet();
+ Set<Rule> produceType = Sets.newHashSet(subclassTriggersType, typeTriggersAny);
+ Set<Rule> produceSubclass = Sets.newHashSet(typeTriggersAny);
+ Set<Rule> produceAny = Sets.newHashSet(subclassTriggersType, typeTriggersAny);
+ Set<Rule> consumeType = Sets.newHashSet(anyTriggersNothing, typeTriggersAny);
+ Ruleset ruleset = new Ruleset(allRules);
+ Assert.assertEquals(produceType, ruleset.getPredecessorsOf(typeTriggersAny));
+ Assert.assertEquals(allRules, ruleset.getSuccessorsOf(typeTriggersAny));
+ Assert.assertEquals(produceSubclass, ruleset.getPredecessorsOf(subclassTriggersType));
+ Assert.assertEquals(consumeType, ruleset.getSuccessorsOf(subclassTriggersType));
+ Assert.assertEquals(produceAny, ruleset.getPredecessorsOf(anyTriggersNothing));
+ Assert.assertEquals(noRules, ruleset.getSuccessorsOf(anyTriggersNothing));
+ }
+
+ @Test
+ public void testIndirectDependencies() {
+ StatementPattern genericSP = new StatementPattern(new Var("a"), new Var("b"), new Var("c"));
+ StatementPattern typeSP = new StatementPattern(new Var("x"), c(RDF.TYPE), new Var("t"));
+ StatementPattern scoSP = new StatementPattern(new Var("x"), c(RDFS.SUBCLASSOF), new Var("y"));
+ StatementPattern spoSP = new StatementPattern(new Var("x"), c(RDFS.SUBPROPERTYOF), new Var("y"));
+ Rule typeTriggersAny = new TestRule(
+ Sets.newHashSet(typeSP),
+ Sets.newHashSet(genericSP, typeSP, scoSP));
+ Rule subclassTriggersType = new TestRule(
+ Sets.newHashSet(scoSP),
+ Sets.newHashSet(genericSP, typeSP));
+ Rule anyTriggersNothing = new TestRule(
+ Sets.newHashSet(genericSP),
+ Sets.newHashSet());
+ Rule typeTriggersSubprop = new TestRule(
+ Sets.newHashSet(typeSP),
+ Sets.newHashSet(genericSP, spoSP));
+ Set<Rule> allRules = Sets.newHashSet(anyTriggersNothing, subclassTriggersType,
+ typeTriggersAny, typeTriggersSubprop);
+ Ruleset ruleset = new Ruleset(allRules);
+ Assert.assertTrue(ruleset.pathExists(typeTriggersAny, typeTriggersAny));
+ Assert.assertTrue(ruleset.pathExists(typeTriggersAny, subclassTriggersType));
+ Assert.assertTrue(ruleset.pathExists(typeTriggersAny, anyTriggersNothing));
+ Assert.assertTrue(ruleset.pathExists(typeTriggersAny, typeTriggersSubprop));
+ Assert.assertTrue(ruleset.pathExists(subclassTriggersType, typeTriggersAny));
+ Assert.assertTrue(ruleset.pathExists(subclassTriggersType, subclassTriggersType));
+ Assert.assertTrue(ruleset.pathExists(subclassTriggersType, anyTriggersNothing));
+ Assert.assertTrue(ruleset.pathExists(subclassTriggersType, typeTriggersSubprop));
+ Assert.assertFalse(ruleset.pathExists(anyTriggersNothing, typeTriggersAny));
+ Assert.assertFalse(ruleset.pathExists(anyTriggersNothing, subclassTriggersType));
+ Assert.assertFalse(ruleset.pathExists(anyTriggersNothing, anyTriggersNothing));
+ Assert.assertFalse(ruleset.pathExists(anyTriggersNothing, typeTriggersSubprop));
+ Assert.assertFalse(ruleset.pathExists(typeTriggersSubprop, typeTriggersAny));
+ Assert.assertFalse(ruleset.pathExists(typeTriggersSubprop, subclassTriggersType));
+ Assert.assertTrue(ruleset.pathExists(typeTriggersSubprop, anyTriggersNothing));
+ Assert.assertFalse(ruleset.pathExists(typeTriggersSubprop, typeTriggersSubprop));
+ }
+}
diff --git a/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/SpinConstructRuleTest.java b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/SpinConstructRuleTest.java
new file mode 100644
index 0000000..9bbcce0
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/java/org/apache/rya/forwardchain/rule/SpinConstructRuleTest.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.rya.forwardchain.rule;
+
+import java.util.Arrays;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.openrdf.model.URI;
+import org.openrdf.model.Value;
+import org.openrdf.model.ValueFactory;
+import org.openrdf.model.impl.ValueFactoryImpl;
+import org.openrdf.model.vocabulary.FOAF;
+import org.openrdf.model.vocabulary.OWL;
+import org.openrdf.model.vocabulary.RDF;
+import org.openrdf.model.vocabulary.RDFS;
+import org.openrdf.query.algebra.StatementPattern;
+import org.openrdf.query.algebra.Var;
+import org.openrdf.query.parser.ParsedGraphQuery;
+import org.openrdf.query.parser.sparql.SPARQLParser;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.Multiset;
+
+public class SpinConstructRuleTest {
+ private static ValueFactory VF = ValueFactoryImpl.getInstance();
+ private static SPARQLParser PARSER = new SPARQLParser();
+
+ private static URI RL_CAX_SCO = VF.createURI("http://example.org/rl/cax-sco");
+ private static URI RL_SCM_CLS = VF.createURI("http://example.org/rl/scm-cls");
+ private static URI RL_PRP_SPO1 = VF.createURI("http://example.org/rl/prp-spo");
+ private static URI LIVING_THING = VF.createURI("http://example.org/LivingThing");
+
+ private static Var c(Value val) {
+ return new Var("-const-" + val.stringValue(), val);
+ }
+ private static Var ac(Value val) {
+ Var v = c(val);
+ v.setAnonymous(true);
+ return v;
+ }
+
+ @Test
+ public void testEmptyWhere() throws Exception {
+ String text = "CONSTRUCT {\n"
+ + " ?this a <" + LIVING_THING.stringValue() + "> .\n"
+ + "} WHERE { }";
+ ParsedGraphQuery query = (ParsedGraphQuery) PARSER.parseQuery(text, null);
+ SpinConstructRule rule = new SpinConstructRule(FOAF.PERSON, VF.createURI("urn:person-is-living"), query);
+ Multiset<StatementPattern> expectedAntecedents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("this"), c(RDF.TYPE), c(FOAF.PERSON))));
+ Multiset<StatementPattern> expectedConsequents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("subject"), new Var("predicate", RDF.TYPE), new Var("object", LIVING_THING))));
+ Assert.assertEquals(expectedAntecedents, HashMultiset.create(rule.getAntecedentPatterns()));
+ Assert.assertEquals(expectedConsequents, HashMultiset.create(rule.getConsequentPatterns()));
+ Assert.assertFalse(rule.hasAnonymousConsequent());
+ // Basic pattern matches
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), c(RDF.TYPE), c(LIVING_THING))));
+ // Broader patterns match (variables in place of constants)
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), c(RDF.TYPE), new Var("y"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), new Var("y"), c(LIVING_THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("a"), new Var("b"), new Var("c"))));
+ // Narrower patterns match (constants in place of variables)
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(RDF.TYPE), c(RDF.TYPE), c(LIVING_THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(FOAF.MBOX), c(RDF.TYPE), new Var("y"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(RDF.ALT), new Var("y"), c(LIVING_THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(RDF.BAG), new Var("b"), new Var("c"))));
+ // Incompatible patterns don't match (different constants)
+ Assert.assertFalse(rule.canConclude(new StatementPattern(new Var("x"), c(RDFS.SUBCLASSOF), new Var("y"))));
+ Assert.assertFalse(rule.canConclude(new StatementPattern(new Var("x"), new Var("y"), c(FOAF.PERSON))));
+ Assert.assertFalse(rule.canConclude(new StatementPattern(c(RDF.TYPE), c(RDF.TYPE), c(RDF.TYPE))));
+ }
+
+ @Test
+ public void testThisUnbound() throws Exception {
+ String text = "CONSTRUCT {\n"
+ + " ?ind a ?superclass .\n"
+ + "} WHERE {\n"
+ + " ?ind a ?subclass .\n"
+ + " ?subclass rdfs:subClassOf ?superclass .\n"
+ + "}";
+ ParsedGraphQuery query = (ParsedGraphQuery) PARSER.parseQuery(text, null);
+ SpinConstructRule rule = new SpinConstructRule(OWL.THING, RL_CAX_SCO, query);
+ Multiset<StatementPattern> expectedAntecedents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("subclass"), ac(RDFS.SUBCLASSOF), new Var("superclass")),
+ new StatementPattern(new Var("ind"), ac(RDF.TYPE), new Var("subclass"))));
+ Multiset<StatementPattern> expectedConsequents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("subject"), new Var("predicate", RDF.TYPE), new Var("object"))));
+ Assert.assertEquals(expectedAntecedents, HashMultiset.create(rule.getAntecedentPatterns()));
+ Assert.assertEquals(expectedConsequents, HashMultiset.create(rule.getConsequentPatterns()));
+ Assert.assertFalse(rule.hasAnonymousConsequent());
+ // Basic pattern matches
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), c(RDF.TYPE), new Var("y"))));
+ // Broader patterns match (variables in place of constants)
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("a"), new Var("b"), new Var("c"))));
+ // Narrower patterns match (constants in place of variables)
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(RDF.TYPE), c(RDF.TYPE), c(RDF.TYPE))));
+ // Incompatible patterns don't match (different constants)
+ Assert.assertFalse(rule.canConclude(new StatementPattern(new Var("x"), c(RDFS.SUBCLASSOF), new Var("y"))));
+ }
+
+ @Test
+ public void testMultipleConsequents() throws Exception {
+ String text = "CONSTRUCT {\n"
+ // actual rule is "?this subClassOf ?this", but reflexive construct patterns produce
+ // bnodes due to an openrdf bug, resulting in incorrect matches
+ + " ?this rdfs:subClassOf ?something .\n"
+ + " ?this owl:equivalentClass ?something .\n"
+ + " ?this rdfs:subClassOf owl:Thing .\n"
+ + " owl:Nothing rdfs:subClassOf ?this .\n"
+ + "} WHERE { }";
+ ParsedGraphQuery query = (ParsedGraphQuery) PARSER.parseQuery(text, null);
+ SpinConstructRule rule = new SpinConstructRule(OWL.CLASS, RL_SCM_CLS, query);
+ Multiset<StatementPattern> expectedAntecedents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("this"), c(RDF.TYPE), c(OWL.CLASS))));
+ Multiset<StatementPattern> expectedConsequents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("subject"), new Var("predicate", RDFS.SUBCLASSOF), new Var("object")),
+ new StatementPattern(new Var("subject"), new Var("predicate", OWL.EQUIVALENTCLASS), new Var("object")),
+ new StatementPattern(new Var("subject"), new Var("predicate", RDFS.SUBCLASSOF), new Var("object", OWL.THING)),
+ new StatementPattern(new Var("subject", OWL.NOTHING), new Var("predicate", RDFS.SUBCLASSOF), new Var("object"))));
+ Assert.assertEquals(expectedAntecedents, HashMultiset.create(rule.getAntecedentPatterns()));
+ Assert.assertEquals(expectedConsequents, HashMultiset.create(rule.getConsequentPatterns()));
+ // Basic pattern matches
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), c(RDFS.SUBCLASSOF), new Var("y"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), c(OWL.EQUIVALENTCLASS), new Var("y"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), c(RDFS.SUBCLASSOF), c(OWL.THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), c(RDFS.SUBCLASSOF), new Var("y"))));
+ // Broader patterns match (variables in place of constants)
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("a"), new Var("b"), new Var("c"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("a"), new Var("b"), c(OWL.THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), new Var("b"), new Var("c"))));
+ // Narrower patterns match (constants in place of variables)
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(FOAF.PERSON), c(RDFS.SUBCLASSOF), new Var("x"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(FOAF.PERSON), c(OWL.EQUIVALENTCLASS), c(FOAF.PERSON))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), c(RDFS.SUBCLASSOF), c(FOAF.PERSON))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), c(OWL.EQUIVALENTCLASS), c(FOAF.PERSON))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), c(OWL.EQUIVALENTCLASS), c(OWL.THING))));
+ // Incompatible patterns don't match (different constants)
+ Assert.assertFalse(rule.canConclude(new StatementPattern(new Var("x"), c(RDFS.SUBPROPERTYOF), c(OWL.THING))));
+ }
+
+ @Test
+ public void testGeneralConsequent() throws Exception {
+ String text = "CONSTRUCT {\n"
+ + " ?x ?p2 ?y"
+ + "} WHERE {\n"
+ + " ?x ?p1 ?y .\n"
+ + " ?p1 rdfs:subPropertyOf ?p2 .\n"
+ + "}";
+ ParsedGraphQuery query = (ParsedGraphQuery) PARSER.parseQuery(text, null);
+ SpinConstructRule rule = new SpinConstructRule(OWL.THING, RL_PRP_SPO1, query);
+ Multiset<StatementPattern> expectedAntecedents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("p1"), ac(RDFS.SUBPROPERTYOF), new Var("p2")),
+ new StatementPattern(new Var("x"), new Var("p1"), new Var("y"))));
+ Multiset<StatementPattern> expectedConsequents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("subject"), new Var("predicate"), new Var("object"))));
+ Assert.assertEquals(expectedAntecedents, HashMultiset.create(rule.getAntecedentPatterns()));
+ Assert.assertEquals(expectedConsequents, HashMultiset.create(rule.getConsequentPatterns()));
+ Assert.assertFalse(rule.hasAnonymousConsequent());
+ // Basic pattern matches
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("a"), new Var("b"), new Var("c"))));
+ // Narrower patterns match (constants in place of variables)
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), c(RDFS.SUBPROPERTYOF), c(OWL.THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), new Var("prop"), c(OWL.THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(FOAF.PERSON), c(RDFS.SUBCLASSOF), new Var("x"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), c(RDFS.SUBCLASSOF), c(FOAF.PERSON))));
+ }
+
+ @Test
+ public void testAnonymousConsequent() throws Exception {
+ String text = "CONSTRUCT {\n"
+ + " ?x ?p2 _:something"
+ + "} WHERE {\n"
+ + " ?x ?p1 ?y .\n"
+ + " ?p1 rdfs:subPropertyOf ?p2 .\n"
+ + "}";
+ ParsedGraphQuery query = (ParsedGraphQuery) PARSER.parseQuery(text, null);
+ SpinConstructRule rule = new SpinConstructRule(OWL.THING, RL_PRP_SPO1, query);
+ Multiset<StatementPattern> expectedAntecedents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("p1"), ac(RDFS.SUBPROPERTYOF), new Var("p2")),
+ new StatementPattern(new Var("x"), new Var("p1"), new Var("y"))));
+ Assert.assertEquals(expectedAntecedents, HashMultiset.create(rule.getAntecedentPatterns()));
+ // should have detected anonymous node
+ Assert.assertTrue(rule.hasAnonymousConsequent());
+ Var anonymousObject = new Var("object");
+ anonymousObject.setAnonymous(true);
+ Multiset<StatementPattern> expectedConsequents = HashMultiset.create(Arrays.asList(
+ new StatementPattern(new Var("subject"), new Var("predicate"), anonymousObject)));
+ Assert.assertEquals(expectedConsequents, HashMultiset.create(rule.getConsequentPatterns()));
+ // Pattern matches should be unaffected by anonymous node status
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("a"), new Var("b"), new Var("c"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(new Var("x"), c(RDFS.SUBPROPERTYOF), c(OWL.THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), new Var("prop"), c(OWL.THING))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(FOAF.PERSON), c(RDFS.SUBCLASSOF), new Var("x"))));
+ Assert.assertTrue(rule.canConclude(new StatementPattern(c(OWL.NOTHING), c(RDFS.SUBCLASSOF), c(FOAF.PERSON))));
+ }
+}
diff --git a/extras/rya.forwardchain/src/test/resources/data.ttl b/extras/rya.forwardchain/src/test/resources/data.ttl
new file mode 100644
index 0000000..f026409
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/resources/data.ttl
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Sample data similar to LUBM
+
+@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix owl: <http://www.w3.org/2002/07/owl#> .
+@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
+@prefix lubm: <http://swat.cse.lehigh.edu/onto/univ-bench.owl#> .
+@prefix ex: <http://example.org/> .
+
+ex:Department0 lubm:subOrganizationOf ex:College0 .
+ex:Department1 lubm:subOrganizationOf ex:College1 .
+ex:Department2 lubm:subOrganizationOf ex:College2 .
+ex:Department3 lubm:subOrganizationOf ex:College2 .
+
+ex:College0 a lubm:Organization ; lubm:subOrganizationOf ex:University0 .
+ex:College1 a lubm:Organization ; lubm:subOrganizationOf ex:University0 .
+ex:College2 lubm:subOrganizationOf ex:University1 .
+
+ex:Department0 a lubm:Department .
+ex:Department1 a lubm:Department .
+ex:Department2 a lubm:Department .
+ex:Department3 a lubm:Department .
+
+# Professors -- infer Faculty and therefore Person
+ex:Alice a lubm:Professor .
+ex:Bob a lubm:Professor .
+ex:Carol a lubm:Professor .
+ex:Dan a lubm:Professor .
+ex:Eve a lubm:Professor .
+
+# Can infer Organization via rdfs:range
+ex:Alice lubm:worksFor ex:Department2 .
+ex:Carol lubm:worksFor ex:Department0 .
+ex:Dan lubm:worksFor ex:Department2 .
+ex:Eve lubm:worksFor ex:Department1 .
+
+ex:Alice lubm:headOf ex:Department1 . # infer Chair and worksFor
+ex:Dan lubm:headOf ex:Department2 . # infer Chair, already have worksFor
+ex:Eve lubm:headOf ex:ResearchGroup3 . # infer worksFor, therefore Organization, but not Chair because not a Department
\ No newline at end of file
diff --git a/extras/rya.forwardchain/src/test/resources/owlrl.ttl b/extras/rya.forwardchain/src/test/resources/owlrl.ttl
new file mode 100644
index 0000000..b9e67eb
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/resources/owlrl.ttl
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Expresses a fragment of OWL RL in SPIN rules
+
+@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix owl: <http://www.w3.org/2002/07/owl#> .
+@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
+@prefix lubm: <http://swat.cse.lehigh.edu/onto/univ-bench.owl#> .
+@prefix spin: <http://spinrdf.org/spin#> .
+@prefix sp: <http://spinrdf.org/sp#> .
+@prefix rl: <http://example.org/OWL/RL/> .
+
+owl:Thing spin:rule rl:cls-svf1 ,
+ rl:cax-sco ,
+ rl:prp-spo1 ,
+ rl:prp-dom ,
+ rl:prp-rng .
+
+owl:Class spin:rule rl:scm-cls .
+
+rl:cls-svf1 a sp:Construct;
+ spin:thisUnbound "true"^^xsd:boolean ;
+ sp:text """
+ CONSTRUCT {
+ ?u a ?x .
+ }
+ WHERE {
+ ?x owl:someValuesFrom ?y .
+ ?x owl:onProperty ?p .
+ ?u ?p ?v .
+ ?v a ?y .
+ } """ .
+
+rl:cax-sco a sp:Construct;
+ spin:thisUnbound "true"^^xsd:boolean ;
+ sp:text """
+ CONSTRUCT {
+ ?this a ?super .
+ }
+ WHERE {
+ ?this a ?sub .
+ ?sub rdfs:subClassOf ?super .
+ } """ .
+
+rl:prp-spo1 a sp:Construct;
+ spin:thisUnbound "true"^^xsd:boolean ;
+ sp:text """
+ CONSTRUCT {
+ ?x ?super ?y .
+ }
+ WHERE {
+ ?sub rdfs:subPropertyOf ?super .
+ ?x ?sub ?y .
+ } """ .
+
+rl:prp-dom a sp:Construct;
+ spin:thisUnbound "true"^^xsd:boolean ;
+ sp:text """
+ CONSTRUCT {
+ ?s a ?c .
+ }
+ WHERE {
+ ?p rdfs:domain ?c .
+ ?s ?p ?o .
+ } """ .
+
+rl:prp-rng a rl:prp-rng-template .
+rl:prp-rng-template a sp:Template;
+ spin:body [
+ a sp:Construct ;
+ sp:text """
+ CONSTRUCT {
+ ?o a ?c .
+ }
+ WHERE {
+ ?p rdfs:range ?c .
+ ?s ?p ?o .
+ } """ ] .
+
+rl:scm-cls a sp:Construct;
+ sp:text """
+ CONSTRUCT {
+ ?this rdfs:subClassOf ?this .
+ ?this owl:equivalentClass ?this .
+ ?this rdfs:subClassOf owl:Thing .
+ owl:Nothing rdfs:subClassOf ?this .
+ }
+ WHERE { } """ .
+rdfs:subClassOf rdfs:domain owl:Class .
+rdfs:subClassOf rdfs:range owl:Class .
\ No newline at end of file
diff --git a/extras/rya.forwardchain/src/test/resources/query.sparql b/extras/rya.forwardchain/src/test/resources/query.sparql
new file mode 100644
index 0000000..3b93cc8
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/resources/query.sparql
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# LUBM query #12
+
+PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
+PREFIX owl: <http://www.w3.org/2002/07/owl#>
+PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
+PREFIX lubm: <http://swat.cse.lehigh.edu/onto/univ-bench.owl#>
+PREFIX ex: <http://example.org/>
+
+SELECT ?X ?Y WHERE {
+ ?X a lubm:Chair .
+ ?Y a lubm:Department .
+ ?X lubm:worksFor ?Y .
+ ?Y lubm:subOrganizationOf ex:University0 .
+}
\ No newline at end of file
diff --git a/extras/rya.forwardchain/src/test/resources/university.ttl b/extras/rya.forwardchain/src/test/resources/university.ttl
new file mode 100644
index 0000000..e195606
--- /dev/null
+++ b/extras/rya.forwardchain/src/test/resources/university.ttl
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Expresses a fragment of the LUBM ontology in a mixture of OWL and SPIN
+
+@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
+@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
+@prefix owl: <http://www.w3.org/2002/07/owl#> .
+@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .
+@prefix lubm: <http://swat.cse.lehigh.edu/onto/univ-bench.owl#> .
+@prefix spin: <http://spinrdf.org/spin#> .
+@prefix sp: <http://spinrdf.org/sp#> .
+@prefix lr: <http://example.org/LUBM/> .
+
+lubm:Person spin:rule lr:department-head-is-chair .
+lr:department-head-is-chair a sp:Construct ;
+ sp:text """
+ PREFIX lubm: <http://swat.cse.lehigh.edu/onto/univ-bench.owl#>
+ CONSTRUCT {
+ ?this a lubm:Chair .
+ }
+ WHERE {
+ ?this lubm:headOf [ a lubm:Department ] .
+ }
+ """ .
+
+lubm:Organization spin:rule lr:suborganization-transitivity .
+lr:suborganization-transitivity a sp:Construct ;
+ sp:text """
+ PREFIX lubm: <http://swat.cse.lehigh.edu/onto/univ-bench.owl#>
+ CONSTRUCT {
+ ?this lubm:subOrganizationOf ?parent .
+ }
+ WHERE {
+ ?this lubm:subOrganizationOf ?child .
+ ?child lubm:subOrganizationOf ?parent .
+ }
+ """ .
+
+lubm:Professor rdfs:subClassOf lubm:Faculty .
+lubm:Faculty rdfs:subClassOf lubm:Person .
+
+lubm:worksFor rdfs:range lubm:Organization .
+lubm:headOf rdfs:subPropertyOf lubm:worksFor .
\ No newline at end of file