blob: 9e1be472afaec0c4598cdbac2ae89f60566a8f2c [file] [log] [blame]
package org.apache.rya.reasoning.mr;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
import org.apache.rya.accumulo.mr.RyaStatementWritable;
import org.apache.rya.reasoning.Derivation;
import org.apache.rya.reasoning.Fact;
import org.apache.rya.reasoning.LocalReasoner;
import org.apache.rya.reasoning.LocalReasoner.Relevance;
import org.apache.rya.reasoning.Schema;
import org.eclipse.rdf4j.model.Resource;
public class ForwardChain extends AbstractReasoningTool {
@Override
protected void configureReasoningJob(String[] args) throws Exception {
distributeSchema();
Configuration conf = job.getConfiguration();
// We can ignore irrelevant triples, unless the schema has just changed
// and therefore we can't rely on previous determinations of relevance.
configureMultipleInput(TableMapper.class, RdfMapper.class,
FileMapper.class, !MRReasoningUtils.isSchemaNew(conf));
job.setMapOutputKeyClass(ResourceWritable.class);
job.setMapOutputValueClass(Fact.class);
job.setReducerClass(ReasoningReducer.class);
job.setSortComparatorClass(ResourceWritable.SecondaryComparator.class);
job.setGroupingComparatorClass(ResourceWritable.PrimaryComparator.class);
configureDerivationOutput(true);
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new ForwardChain(), args));
}
/**
* Decide whether to output facts and with what keys. Subclasses handle
* different sources of input.
*/
public static class ForwardChainMapper<K, V> extends Mapper<K, V,
ResourceWritable, Fact> {
protected Schema schema;
protected ResourceWritable node = new ResourceWritable();
protected MultipleOutputs<?, ?> debugOut;
protected boolean debug;
private Text debugKey = new Text();
private Text debugValue = new Text();
public ForwardChainMapper(Schema s) {
this.schema = s;
}
public ForwardChainMapper() {}
@Override
protected void setup(Context context) {
debugOut = new MultipleOutputs<>(context);
Configuration conf = context.getConfiguration();
if (schema == null) {
schema = MRReasoningUtils.loadSchema(context.getConfiguration());
}
debug = MRReasoningUtils.debug(conf);
}
@Override
public void cleanup(Context context) throws IOException,
InterruptedException {
if (debugOut != null) {
debugOut.close();
}
}
protected void process(Context context, Fact inputTriple)
throws IOException, InterruptedException {
Relevance rel = LocalReasoner.relevantFact(inputTriple, schema);
if (rel.subject()) {
node.set(inputTriple.getSubject(), 1);
context.write(node, inputTriple);
if (debug) {
int i = inputTriple.getIteration();
debugKey.set("MAP_OUT" + node.toString());
debugValue.set(inputTriple.explain(false) + "[" + i + "]");
debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey,
debugValue);
}
}
if (rel.object()) {
node.set((Resource) inputTriple.getObject(), -1);
context.write(node, inputTriple);
if (debug) {
int i = inputTriple.getIteration();
debugKey.set("MAP_OUT" + node.toString());
debugValue.set(inputTriple.explain(false) + "[" + i + "]");
debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey,
debugValue);
}
}
}
}
/**
* Get input data from the database
*/
public static class TableMapper extends ForwardChainMapper<Key, Value> {
private Fact inputTriple = new Fact();
public TableMapper() { super(); }
public TableMapper(Schema s) { super(s); }
@Override
public void map(Key row, Value data, Context context)
throws IOException, InterruptedException {
inputTriple.setTriple(MRReasoningUtils.getStatement(row, data,
context.getConfiguration()));
process(context, inputTriple);
}
}
/**
* Get intermediate data from a sequence file
*/
public static class FileMapper extends ForwardChainMapper<Fact,
NullWritable> {
public FileMapper() { super(); }
public FileMapper(Schema s) { super(s); }
@Override
public void map(Fact inputTriple, NullWritable nw,
Context context) throws IOException, InterruptedException {
process(context, inputTriple);
}
}
/**
* Get input data from an RDF file
*/
public static class RdfMapper extends ForwardChainMapper<LongWritable,
RyaStatementWritable> {
private Fact inputTriple = new Fact();
public RdfMapper() { super(); }
public RdfMapper(Schema s) { super(s); }
@Override
public void map(LongWritable key, RyaStatementWritable rsw,
Context context) throws IOException, InterruptedException {
inputTriple.setTriple(rsw.getRyaStatement());
process(context, inputTriple);
}
}
public static class ReasoningReducer extends Reducer<ResourceWritable,
Fact, Fact, NullWritable> {
private static final int LOG_INTERVAL = 5000;
private Logger log = Logger.getLogger(ReasoningReducer.class);
private MultipleOutputs<?, ?> mout;
private Schema schema;
private boolean debug;
private Text debugK = new Text();
private Text debugV = new Text();
private int maxStored = 0;
private String maxNode = "";
public ReasoningReducer(Schema s) {
this.schema = s;
}
public ReasoningReducer() {}
@Override
public void setup(Context context) {
mout = new MultipleOutputs<>(context);
Configuration conf = context.getConfiguration();
if (schema == null) {
schema = MRReasoningUtils.loadSchema(conf);
}
debug = MRReasoningUtils.debug(conf);
}
@Override
public void cleanup(Context context) throws IOException,
InterruptedException {
if (mout != null) {
mout.close();
}
log.info("Most input triples stored at one time by any reasoner: "
+ maxStored + " (reasoner for node: " + maxNode + ")");
}
@Override
public void reduce(ResourceWritable key, Iterable<Fact> facts,
Context context) throws IOException, InterruptedException {
log.debug("Reasoning for node " + key.toString());
// If the schema was just updated, all facts are potentially
// meaningful again. Otherwise, any new derivation must use at
// least one fact from the previous (or this) iteration.
Configuration conf = context.getConfiguration();
LocalReasoner reasoner = new LocalReasoner(key.get(), schema,
MRReasoningUtils.getCurrentIteration(conf),
MRReasoningUtils.lastSchemaUpdate(conf));
long numInput = 0;
long numOutput = 0;
for (Fact fact : facts) {
if (debug) {
debugK.set("INPUT<" + key.get().stringValue() + ">");
debugV.set(fact.toString());
mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
}
// We actually need separate fact objects, as the reasoner might
// store them (default is to reuse the same object each time)
reasoner.processFact(fact.clone());
numInput++;
numOutput += handleResults(reasoner, context);
if (numInput % LOG_INTERVAL == 0) {
log.debug(reasoner.getDiagnostics());
log.debug(numInput + " input triples so far");
log.debug(numOutput + " output triples/inconsistencies so far");
}
}
reasoner.getTypes();
numOutput += handleResults(reasoner, context);
int numStored = reasoner.getNumStored();
if (numStored > maxStored) {
maxStored = numStored;
maxNode = key.toString();
}
log.debug("..." + numStored + " input facts stored in memory");
}
/**
* Process any new results from a reasoner.
*/
private long handleResults(LocalReasoner reasoner, Context context)
throws IOException, InterruptedException {
long numOutput = 0;
if (reasoner.hasNewFacts()) {
for (Fact fact : reasoner.getFacts()) {
mout.write(getOutputName(fact), fact, NullWritable.get());
numOutput++;
if (debug) {
debugK.set("OUTPUT<" + reasoner.getNode().stringValue() + ">");
debugV.set(fact.explain(false));
mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
}
}
}
if (reasoner.hasInconsistencies()) {
for (Derivation inconsistency : reasoner.getInconsistencies()) {
mout.write(getOutputName(inconsistency), inconsistency,
NullWritable.get());
numOutput++;
if (debug) {
debugK.set("OUTPUT<" + inconsistency.getNode().stringValue() + ">");
debugV.set(inconsistency.explain(false));
mout.write(MRReasoningUtils.DEBUG_OUT, debugK, debugV);
}
}
}
return numOutput;
}
}
}