blob: b52b04d2e1a4aa9a9dd29710a2260196ce42e861 [file] [log] [blame]
package org.apache.rya.reasoning.mr;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import org.apache.rya.accumulo.mr.RyaStatementWritable;
import org.apache.rya.reasoning.Fact;
import org.apache.rya.reasoning.Schema;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Logger;
/**
* Collects the schema information stored in the table and outputs the schema
* (TBox) to a file.
*/
public class SchemaFilter extends AbstractReasoningTool {
@Override
protected void configureReasoningJob(String[] args) throws Exception {
configureMultipleInput(SchemaTableMapper.class, SchemaRdfMapper.class,
SchemaFileMapper.class, true);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Fact.class);
job.setReducerClass(SchemaFilterReducer.class);
job.setNumReduceTasks(1);
configureSchemaOutput();
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new SchemaFilter(), args));
}
public static class SchemaTableMapper extends Mapper<Key, Value,
NullWritable, Fact> {
private Fact fact = new Fact();
/**
* Output a triple if it is schema information.
*/
@Override
public void map(Key row, Value data, Context context)
throws IOException, InterruptedException {
fact.setTriple(MRReasoningUtils.getStatement(row, data,
context.getConfiguration()));
boolean isSchemaTriple = Schema.isSchemaTriple(fact.getTriple());
if (isSchemaTriple) {
context.write(NullWritable.get(), fact);
}
countInput(isSchemaTriple, context);
}
}
public static class SchemaFileMapper extends Mapper<Fact,
NullWritable, NullWritable, Fact> {
/**
* For a given fact, output it if it's a schema triple.
*/
@Override
public void map(Fact fact, NullWritable nw, Context context)
throws IOException, InterruptedException {
if (Schema.isSchemaTriple(fact.getTriple())) {
context.write(NullWritable.get(), fact);
}
}
}
public static class SchemaRdfMapper extends Mapper<LongWritable,
RyaStatementWritable, NullWritable, Fact> {
private Fact fact = new Fact();
/**
* For a given fact, output it if it's a schema triple.
*/
@Override
public void map(LongWritable key, RyaStatementWritable rsw, Context context)
throws IOException, InterruptedException {
fact.setTriple(rsw.getRyaStatement());
boolean isSchemaTriple = Schema.isSchemaTriple(fact.getTriple());
if (isSchemaTriple) {
context.write(NullWritable.get(), fact);
}
countInput(isSchemaTriple, context);
}
}
public static class SchemaFilterReducer extends Reducer<NullWritable,
Fact, NullWritable, SchemaWritable> {
private SchemaWritable schema;
private Logger log = Logger.getLogger(SchemaFilterReducer.class);
private static int LOG_INTERVAL = 1000;
private boolean debug = false;
private MultipleOutputs<?, ?> debugOut;
private Text debugKey = new Text();
private Text debugValue = new Text();
@Override
protected void setup(Context context) {
schema = new SchemaWritable();
debug = MRReasoningUtils.debug(context.getConfiguration());
debugOut = new MultipleOutputs<>(context);
}
/**
* Collect all schema information into a Schema object, use it to derive
* as much additional schema information as we can, and serialize it to
* an HDFS file.
*/
@Override
protected void reduce(NullWritable key, Iterable<Fact> triples,
Context context) throws IOException, InterruptedException {
long count = 0;
for (Fact fact : triples) {
schema.processTriple(fact.getTriple());
count++;
if (count % LOG_INTERVAL == 0) {
log.debug("After " + count + " schema triples...");
log.debug(schema.getSummary());
}
if (debug) {
debugKey.set("SCHEMA TRIPLE " + count);
debugValue.set(fact.explain(false));
debugOut.write(MRReasoningUtils.DEBUG_OUT, debugKey, debugValue);
}
}
log.debug("Total: " + count + " schema triples");
log.debug(schema.getSummary());
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
if (debugOut != null) {
debugOut.close();
}
// Perform schema-level reasoning
schema.closure();
// Output the complete schema
context.write(NullWritable.get(), schema);
}
}
}