blob: c81c6fe3bbfdcebb771e4e675ad26f299402744b [file] [log] [blame]
package org.apache.rya.reasoning.mr;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.mapreduce.AccumuloInputFormat;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.lib.input.CombineSequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.rya.accumulo.mr.MRUtils;
import org.apache.rya.accumulo.mr.RdfFileInputFormat;
import org.apache.rya.accumulo.mr.RyaStatementWritable;
import org.apache.rya.reasoning.Derivation;
import org.apache.rya.reasoning.Fact;
import org.apache.rya.reasoning.Schema;
import org.eclipse.rdf4j.rio.RDFFormat;
/**
* Contains common functionality for MapReduce jobs involved in reasoning. A
* subclass should implement configureReasoningJob and its own mappers and
* reducers.
*/
abstract public class AbstractReasoningTool extends Configured implements Tool {
// Keep track of statistics about the input
protected enum COUNTERS { ABOX, TBOX, USEFUL }
// MapReduce job, to be configured by subclasses
protected Job job;
/**
* Configure the job's inputs, outputs, mappers, and reducers.
*/
abstract protected void configureReasoningJob(String[] args) throws Exception;
/**
* Configure and run a MapReduce job.
*/
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
job = Job.getInstance(conf);
job.setJobName(getJobName());
job.setJarByClass(this.getClass());
configureReasoningJob(args);
boolean success = job.waitForCompletion(!MRReasoningUtils.stats(conf));
if (success) {
return 0;
}
else {
return 1;
}
}
/**
* Cumulative CPU time taken by all mappers/reducers.
*/
public long getCumulativeTime() throws IOException {
return getCounter(TaskCounter.CPU_MILLISECONDS);
}
/**
* Default name for the MapReduce job:
*/
protected String getJobName() {
return "Rya reasoning, pass " + MRReasoningUtils.getCurrentIteration(getConf())
+ ": " + this.getClass().getSimpleName() + "_" + System.currentTimeMillis();
}
/**
* Number of inconsistencies detected by this job.
*/
public long getNumInconsistencies() throws IOException {
return getCounter(MultipleOutputs.class.getName(),
MRReasoningUtils.INCONSISTENT_OUT);
}
/**
* Number of new schema triples derived during this job.
*/
public long getNumSchemaTriples() throws IOException {
return getCounter(MultipleOutputs.class.getName(),
MRReasoningUtils.SCHEMA_OUT);
}
/**
* Number of new instance triples that might be used for future reasoning
*/
public long getNumUsefulOutput() throws IOException {
return getCounter(MultipleOutputs.class.getName(),
MRReasoningUtils.INTERMEDIATE_OUT);
}
/**
* Number of new instance triples that will not be used for future reasoning
*/
public long getNumTerminalOutput() throws IOException {
return getCounter(MultipleOutputs.class.getName(),
MRReasoningUtils.TERMINAL_OUT);
}
/**
* Total number of new instance triples derived during this job.
*/
public long getNumInstanceTriples() throws IOException {
return getNumUsefulOutput() + getNumTerminalOutput();
}
/**
* Number of instance triples seen as input during this job.
*/
public long getNumInstanceInput() throws IOException {
return getCounter(COUNTERS.ABOX);
}
/**
* Number of schema triples seen as input during this job.
*/
public long getNumSchemaInput() throws IOException {
return getCounter(COUNTERS.TBOX);
}
/**
* Increment the schema or instance triple counter, as appropriate.
*/
protected static void countInput(boolean schema, TaskAttemptContext context) {
if (schema) {
context.getCounter(COUNTERS.TBOX).increment(1);
}
else {
context.getCounter(COUNTERS.ABOX).increment(1);
}
}
/**
* Add the schema file (TBox) to the distributed cache for the current job.
*/
protected void distributeSchema() {
Path schemaPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
job.addCacheFile(schemaPath.toUri());
}
/**
* Set up the MapReduce job to use as inputs both an Accumulo table and the
* files containing previously derived information, excluding
* inconsistencies. Looks for a file for every iteration number so far,
* preferring final cleaned up output from that iteration but falling back
* on intermediate data if necessary.
* @param tableMapper Mapper class to use for database input
* @param rdfMapper Mapper class to use for direct RDF input
* @param fileMapper Mapper class to use for derived triples input
* @param filter True to exclude previously derived data that couldn't be
* used to derive anything new at this point.
*/
protected void configureMultipleInput(
Class<? extends Mapper<Key, Value, ?, ?>> tableMapper,
Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper,
Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
boolean filter) throws IOException, AccumuloSecurityException {
Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration());
if (inputPath != null) {
configureRdfInput(inputPath, rdfMapper);
}
else {
configureAccumuloInput(tableMapper);
}
configureFileInput(fileMapper, filter);
}
/**
* Set up the MapReduce job to use as inputs both an Accumulo table and the
* files containing previously derived information. Looks for a file for
* every iteration number so far, preferring final cleaned up output from
* that iteration but falling back on intermediate data if necessary.
* @param tableMapper Mapper class to use for database input
* @param rdfMapper Mapper class to use for direct RDF input
* @param fileMapper Mapper class to use for derived triples input
* @param incMapper Mapper class to use for derived inconsistencies input
* @param filter True to exclude previously derived data that couldn't be
* used to derive anything new at this point.
*/
protected void configureMultipleInput(
Class<? extends Mapper<Key, Value, ?, ?>> tableMapper,
Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper,
Class<? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
Class<? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper,
boolean filter)
throws IOException, AccumuloSecurityException {
Path inputPath = MRReasoningUtils.getInputPath(job.getConfiguration());
if (inputPath != null) {
configureRdfInput(inputPath, rdfMapper);
}
else {
configureAccumuloInput(tableMapper);
}
configureFileInput(fileMapper, incMapper, filter);
}
/**
* Set up the MapReduce job to use file inputs from previous iterations,
* excluding inconsistencies found.
* @param fileMapper Mapper class to use for generated triples
* @param filter Exclude facts that aren't helpful for inference
*/
protected void configureFileInput(
Class <? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
final boolean filter) throws IOException {
configureFileInput(fileMapper, null, filter);
}
/**
* Set up the MapReduce job to use file inputs from previous iterations.
* @param fileMapper Mapper class for generated triples
* @param incMapper Mapper class for generated inconsistenies
* @param filter Exclude facts that aren't helpful for inference
*/
protected void configureFileInput(
Class <? extends Mapper<Fact, NullWritable, ?, ?>> fileMapper,
Class <? extends Mapper<Derivation, NullWritable, ?, ?>> incMapper,
final boolean filter) throws IOException {
// Set up file input for all iterations up to this one
Configuration conf = job.getConfiguration();
FileSystem fs = FileSystem.get(conf);
Path inputPath;
int iteration = MRReasoningUtils.getCurrentIteration(conf);
// Set min/max split, if not already provided:
long blocksize = Long.parseLong(conf.get("dfs.blocksize"));
String minSplitProp = "mapreduce.input.fileinputformat.split.minsize";
String maxSplitProp = "mapreduce.input.fileinputformat.split.maxsize";
conf.set(minSplitProp, conf.get(minSplitProp, String.valueOf(blocksize)));
conf.set(maxSplitProp, conf.get(maxSplitProp, String.valueOf(blocksize*8)));
for (int i = 1; i <= iteration; i++) {
// Prefer cleaned output...
inputPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + i);
// But if there isn't any, try intermediate data:
if (!fs.isDirectory(inputPath)) {
inputPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + i
+ MRReasoningUtils.TEMP_SUFFIX);
}
// And only proceed if we found one or the other.
if (fs.isDirectory(inputPath)) {
// Never include debug output. If filter is true, select only
// intermediate and schema data, otherwise include everything.
PathFilter f = new PathFilter() {
public boolean accept(Path path) {
String s = path.getName();
if (s.startsWith(MRReasoningUtils.DEBUG_OUT)) {
return false;
}
else {
return !filter
|| s.startsWith(MRReasoningUtils.INTERMEDIATE_OUT)
|| s.startsWith(MRReasoningUtils.SCHEMA_OUT);
}
}
};
for (FileStatus status : fs.listStatus(inputPath, f)) {
if (status.getLen() > 0) {
Path p = status.getPath();
String s = p.getName();
if (s.startsWith(MRReasoningUtils.INCONSISTENT_OUT)) {
if (incMapper != null) {
MultipleInputs.addInputPath(job, p,
CombineSequenceFileInputFormat.class, incMapper);
}
}
else {
MultipleInputs.addInputPath(job, status.getPath(),
CombineSequenceFileInputFormat.class, fileMapper);
}
}
}
}
}
}
/**
* Set up the MapReduce job to use Accumulo as an input.
* @param tableMapper Mapper class to use
*/
protected void configureAccumuloInput(Class<? extends Mapper<Key,Value,?,?>> tableMapper)
throws AccumuloSecurityException {
MRReasoningUtils.configureAccumuloInput(job);
MultipleInputs.addInputPath(job, new Path("/tmp/input"),
AccumuloInputFormat.class, tableMapper);
}
/**
* Set up the MapReduce job to use an RDF file as an input.
* @param rdfMapper class to use
*/
protected void configureRdfInput(Path inputPath,
Class<? extends Mapper<LongWritable, RyaStatementWritable, ?, ?>> rdfMapper) {
Configuration conf = job.getConfiguration();
String format = conf.get(MRUtils.FORMAT_PROP, RDFFormat.RDFXML.getName());
conf.set(MRUtils.FORMAT_PROP, format);
MultipleInputs.addInputPath(job, inputPath,
RdfFileInputFormat.class, rdfMapper);
}
/**
* Set up the MapReduce job to output a schema (TBox).
*/
protected void configureSchemaOutput() {
Path outPath = MRReasoningUtils.getSchemaPath(job.getConfiguration());
SequenceFileOutputFormat.setOutputPath(job, outPath);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SchemaWritable.class);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "schemaobj",
SequenceFileOutputFormat.class, NullWritable.class, SchemaWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
/**
* Set up the MapReduce job to output newly derived triples. Outputs to
* directory [base]-[iteration].
*/
protected void configureDerivationOutput() {
configureDerivationOutput(false);
}
/**
* Set up a MapReduce job to output newly derived triples.
* @param intermediate True if this is intermediate data. Outputs
* to [base]-[iteration]-[temp].
*/
protected void configureDerivationOutput(boolean intermediate) {
Path outPath;
Configuration conf = job.getConfiguration();
int iteration = MRReasoningUtils.getCurrentIteration(conf);
if (intermediate) {
outPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + iteration
+ MRReasoningUtils.TEMP_SUFFIX);
}
else {
outPath = MRReasoningUtils.getOutputPath(conf,
MRReasoningUtils.OUTPUT_BASE + iteration);
}
SequenceFileOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
SequenceFileOutputFormat.class, Fact.class, NullWritable.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
SequenceFileOutputFormat.class, Derivation.class, NullWritable.class);
MultipleOutputs.setCountersEnabled(job, true);
// Set up an output for diagnostic info, if needed
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
}
/**
* Set up a MapReduce job to output human-readable text.
*/
protected void configureTextOutput(String destination) {
Path outPath;
outPath = MRReasoningUtils.getOutputPath(job.getConfiguration(), destination);
TextOutputFormat.setOutputPath(job, outPath);
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INTERMEDIATE_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.TERMINAL_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.SCHEMA_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.INCONSISTENT_OUT,
TextOutputFormat.class, NullWritable.class, Text.class);
MultipleOutputs.addNamedOutput(job, MRReasoningUtils.DEBUG_OUT,
TextOutputFormat.class, Text.class, Text.class);
MultipleOutputs.setCountersEnabled(job, true);
}
/**
* Get the name of the output to send an inconsistency to.
* @return The name of the output file(s) to send inconsistencies to
*/
protected static String getOutputName(Derivation inconsistency) {
return MRReasoningUtils.INCONSISTENT_OUT;
}
/**
* Get the name of the output to send a fact to.
* @param fact The fact itself
* @param finalOut True if this is for final output, not intermediate
* @return The name of the output file(s) to send this fact to
*/
protected static String getOutputName(Fact fact, boolean finalOut) {
if (Schema.isSchemaTriple(fact.getTriple())) {
return MRReasoningUtils.SCHEMA_OUT;
}
else if (!finalOut && fact.isUseful()) {
return MRReasoningUtils.INTERMEDIATE_OUT;
}
else {
return MRReasoningUtils.TERMINAL_OUT;
}
}
/**
* Get the name of the output to send a fact to.
*/
protected static String getOutputName(Fact fact) {
return getOutputName(fact, false);
}
/**
* Retrieve an arbitrary counter's value.
* @param group Counter's group name
* @param counter Name of the counter itself
*/
public long getCounter(String group, String counter) throws IOException {
return job.getCounters().findCounter(group, counter).getValue();
}
/**
* Retrieve an arbitrary counter's value.
* @param key The Enum tied to this counter
*/
public long getCounter(Enum<?> key) throws IOException {
return job.getCounters().findCounter(key).getValue();
}
/**
* Get the current iteration according to this job's configuration.
*/
public int getIteration() {
return MRReasoningUtils.getCurrentIteration(getConf());
}
/**
* Get the job's JobID.
*/
public JobID getJobID() {
return job.getJobID();
}
/**
* Get the elapsed wall-clock time, assuming the job is done.
*/
public long getElapsedTime() throws IOException, InterruptedException {
return job.getFinishTime() - job.getStartTime();
}
}