blob: 9d54f9356fe0117bfd2db597352b64c7f815ad91 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.forwardchain.strategy;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.apache.rya.api.domain.RyaType;
import org.apache.rya.api.domain.StatementMetadata;
import org.apache.rya.forwardchain.ForwardChainConstants;
import org.apache.rya.forwardchain.ForwardChainException;
import org.apache.rya.forwardchain.rule.Rule;
import org.apache.rya.forwardchain.rule.Ruleset;
import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
import com.google.common.base.Preconditions;
/**
* A simple {@link AbstractForwardChainStrategy} that iterates over every
* relevant rule once, and repeats until no rules are relevant.
* <p>
* Initially, all rules are considered relevant. Iteration 1 executes each rule
* once.
* <p>
* When a rule produces inferences, all rules that are marked as that rule's
* successors according to the {@link Ruleset} are triggered as potentially
* relevant for future execution. If a triggered rule is scheduled to be
* executed during the current iteration, nothing changes. If a triggered rule
* has already been executed once during the current iteration, or was not
* activated for the current iteration at all, it is flagged to be executed
* during the next iteration.
* <p>
* When an iteration concludes, a new iteration begins with the relevant set of
* rules having been determined during the previous iteration. If there are no
* such rules, forward chaining ends.
* <p>
* Within each iteration, rules are processed such that a rule which may trigger
* many other rules is given priority over a rule that may be triggered by many
* other rules.
* <p>
* The observation that one rule may trigger another is based on the
* relationships between triple patterns produced and consumed by the rules in
* general, not based on any triples that were actually generated. Therefore,
* there may be false positives but not false negatives: Rules triggered by the
* current rule may or may not produce more triples in response, but any rule
* that could produce triples in response will be triggered.
* <p>
* The procedure for executing the individual rules is governed by the
* {@link RuleExecutionStrategy}. This class uses the strategy's reported counts
* to determine whether or not a rule has produced inferences.
*/
public class RoundRobinStrategy extends AbstractForwardChainStrategy {
private static final Logger logger = Logger.getLogger(RoundRobinStrategy.class);
private final AbstractRuleExecutionStrategy ruleStrategy;
private int iteration;
private Ruleset ruleset;
private Set<Rule> activeNow;
private Set<Rule> activeNextIteration;
private long inferencesThisIteration;
private AtomicBoolean initialized = new AtomicBoolean(false);
/**
* Instantiate a RoundRobinStrategy by providing the RuleExecutionStrategy.
* @param ruleStrategy Defines how to execute individual rules; not null.
*/
public RoundRobinStrategy(AbstractRuleExecutionStrategy ruleStrategy) {
Preconditions.checkNotNull(ruleStrategy);
this.ruleStrategy = ruleStrategy;
}
@Override
public void initialize(Ruleset withRuleset) throws ForwardChainException {
Preconditions.checkNotNull(withRuleset);
iteration = 0;
ruleset = withRuleset;
activeNow = new HashSet<>();
activeNextIteration = new HashSet<>(ruleset.getRules());
logger.info("Initializing round robin forward chaining, with " +
activeNextIteration.size() + " rules.");
initialized.set(true);
prepareQueue();
}
private void prepareQueue() throws ForwardChainException {
if (initialized.get()) {
if (activeNow.isEmpty()) {
if (iteration > 0) {
logger.info("Finished iteration " + iteration + "; made " +
inferencesThisIteration + " inferences.");
}
if (activeNextIteration.isEmpty()) {
logger.info("Finished forward chaining after " + iteration + " iterations.");
setDone();
}
else {
ruleStrategy.setRequiredLevel(iteration);
iteration++;
inferencesThisIteration = 0;
activeNow.addAll(activeNextIteration);
activeNextIteration.clear();
logger.info("Beginning iteration " + iteration + ", with " +
activeNow.size() + " rules to execute...");
}
}
}
}
private void setDone() throws ForwardChainException {
initialized.set(false);
if (ruleStrategy != null) {
ruleStrategy.shutDown();
}
}
@Override
public boolean isActive() {
return initialized.get();
}
@Override
public long executeNext() throws ForwardChainException {
if (!initialized.get()) {
return 0;
}
Rule rule = getNextRule();
if (rule == null) {
return 0;
}
StatementMetadata metadata = new StatementMetadata();
metadata.addMetadata(ForwardChainConstants.RYA_DERIVATION_TIME,
new RyaType(XMLSchema.INT, Integer.toString(iteration)));
long inferences = rule.execute(ruleStrategy, metadata);
inferencesThisIteration += inferences;
if (inferences > 0) {
for (Rule successor : ruleset.getSuccessorsOf(rule)) {
// If we'll handle the triggered rule in the current iteration,
// it may not need to be checked in the next one.
if (!activeNow.contains(successor)) {
activeNextIteration.add(successor);
}
}
}
prepareQueue();
return inferences;
}
private Rule getNextRule() {
if (activeNow.isEmpty()) {
return null;
}
Ruleset subset = new Ruleset(activeNow);
SortedSet<Rule> sorted = new TreeSet<>(new Comparator<Rule>() {
@Override
public int compare(Rule r1, Rule r2) {
// If one rule triggers the other (directly or indirectly) but
// not the other way around, the one that triggers the other
// should come first.
boolean forwardPath = subset.pathExists(r1, r2);
boolean backwardPath = subset.pathExists(r2, r1);
if (forwardPath && !backwardPath) {
return -1;
}
if (backwardPath && !forwardPath) {
return 1;
}
return 0;
}
}.thenComparingInt(rule -> {
// Otherwise, prioritize rules that trigger many remaining rules,
// and defer rules that can be triggered by many remaining rules.
return remainingPredecessors(rule).size() - remainingSuccessors(rule).size();
}).thenComparing(Rule::toString)); // Fall back on string comparison
sorted.addAll(activeNow);
Rule next = sorted.first();
activeNow.remove(next);
return next;
}
private Set<Rule> remainingSuccessors(Rule rule) {
Set<Rule> successors = new HashSet<>(ruleset.getSuccessorsOf(rule));
successors.retainAll(activeNow);
return successors;
}
private Set<Rule> remainingPredecessors(Rule rule) {
Set<Rule> predecessors = new HashSet<>(ruleset.getPredecessorsOf(rule));
predecessors.retainAll(activeNow);
return predecessors;
}
}