blob: 759ede955cfc5a01d8c00595c5d9c60c3beadea2 [file] [log] [blame]
package org.apache.rya.reasoning.mr;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceFeeder;
import org.apache.hadoop.mrunit.types.KeyValueReuseList;
import org.apache.hadoop.mrunit.types.Pair;
import org.apache.rya.accumulo.mr.RyaStatementWritable;
import org.apache.rya.api.RdfCloudTripleStoreConstants.TABLE_LAYOUT;
import org.apache.rya.api.domain.RyaStatement;
import org.apache.rya.api.resolver.triple.TripleRow;
import org.apache.rya.api.resolver.triple.TripleRowResolver;
import org.apache.rya.api.resolver.triple.impl.WholeRowTripleResolver;
import org.apache.rya.reasoning.Fact;
import org.apache.rya.reasoning.OwlRule;
import org.apache.rya.reasoning.Schema;
import org.apache.rya.reasoning.TestUtils;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.vocabulary.OWL;
import org.eclipse.rdf4j.model.vocabulary.RDF;
import org.eclipse.rdf4j.model.vocabulary.RDFS;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@RunWith(PowerMockRunner.class)
@PrepareForTest({ForwardChain.FileMapper.class, ForwardChain.ReasoningReducer.class})
public class ForwardChainTest {
// inputs
static Fact X_SUB_Y = new Fact(
TestUtils.uri("x"), TestUtils.uri("subOrganizationOf"), TestUtils.uri("y"));
static Fact Y_SUB_Z = new Fact(
TestUtils.uri("y"), TestUtils.uri("subOrganizationOf"), TestUtils.uri("z"));
// inferences
static Fact Y_SUPER_X = new Fact(
TestUtils.uri("y"), TestUtils.uri("hasSubOrganization"), TestUtils.uri("x"),
1, OwlRule.PRP_INV, TestUtils.uri("y"));
static Fact X_SUB_Z = new Fact(
TestUtils.uri("x"), TestUtils.uri("subOrganizationOf"), TestUtils.uri("z"),
1, OwlRule.PRP_TRP, TestUtils.uri("y"));
// schema
static Fact TRP = new Fact(
TestUtils.uri("subOrganizationOf"), RDF.TYPE, OWL.TRANSITIVEPROPERTY);
static Fact SPO = new Fact(
TestUtils.uri("subOrganizationOf"), RDFS.SUBPROPERTYOF,
TestUtils.uri("affiliatedWith"));
static Fact INV = new Fact(
TestUtils.uri("hasSubOrganization"), OWL.INVERSEOF,
TestUtils.uri("subOrganizationOf"));
Schema schema;
@Before
public void setUp() {
schema = new Schema();
schema.processTriple(TRP.getTriple());
schema.closure();
Y_SUPER_X.addSource(X_SUB_Y);
X_SUB_Z.addSource(X_SUB_Y);
X_SUB_Z.addSource(Y_SUB_Z);
}
@Test
public void testTableMapperOutput() throws Exception {
RyaStatement rya = TestUtils.ryaStatement("x", "subOrganizationOf", "y");
TripleRowResolver trr = new WholeRowTripleResolver();
Map<TABLE_LAYOUT,TripleRow> map = trr.serialize(rya);
TripleRow tr = map.get(TABLE_LAYOUT.SPO);
byte[] b = new byte[0];
Key key = new Key(tr.getRow(), tr.getColumnFamily(),
tr.getColumnQualifier(), b, 1);
Value val = new Value(b);
ResourceWritable rw1 = new ResourceWritable();
ResourceWritable rw2 = new ResourceWritable();
rw1.set(TestUtils.uri("x"));
rw2.set(TestUtils.uri("y"));
new MapDriver<Key, Value, ResourceWritable, Fact>()
.withMapper(new ForwardChain.TableMapper(schema))
.withInput(key, val)
.withOutput(rw1, X_SUB_Y)
.withOutput(rw2, X_SUB_Y)
.runTest();
}
@Test
public void testFileMapperOutput() throws Exception {
ResourceWritable rw1 = new ResourceWritable();
ResourceWritable rw2 = new ResourceWritable();
rw1.set(TestUtils.uri("x"));
rw2.set(TestUtils.uri("y"));
new MapDriver<Fact, NullWritable, ResourceWritable, Fact>()
.withMapper(new ForwardChain.FileMapper(schema))
.withInput(X_SUB_Y, NullWritable.get())
.withOutput(rw1, X_SUB_Y)
.withOutput(rw2, X_SUB_Y)
.runTest();
}
@Test
public void testRdfMapperOutput() throws Exception {
RyaStatement rya = TestUtils.ryaStatement("x", "subOrganizationOf", "y");
RyaStatementWritable rsw = new RyaStatementWritable();
rsw.setRyaStatement(rya);
LongWritable l = new LongWritable();
ResourceWritable rw1 = new ResourceWritable();
ResourceWritable rw2 = new ResourceWritable();
rw1.set(TestUtils.uri("x"));
rw2.set(TestUtils.uri("y"));
new MapDriver<LongWritable, RyaStatementWritable, ResourceWritable,
Fact>()
.withMapper(new ForwardChain.RdfMapper(schema))
.withInput(l, rsw)
.withOutput(rw1, X_SUB_Y)
.withOutput(rw2, X_SUB_Y)
.runTest();
}
@Test
public void testReducerInference() throws Exception {
schema.processTriple(INV.getTriple());
schema.closure();
ResourceWritable rw = new ResourceWritable();
rw.set(TestUtils.uri("y"));
List<Fact> facts = new LinkedList<>();
facts.add(X_SUB_Y);
new ReduceDriver<ResourceWritable, Fact, Fact, NullWritable>()
.withReducer(new ForwardChain.ReasoningReducer(schema))
.withInput(rw, facts)
.withMultiOutput(MRReasoningUtils.INTERMEDIATE_OUT,
Y_SUPER_X, NullWritable.get())
.runTest();
}
@Test
public void testReducerJoin() throws Exception {
ResourceWritable rw = new ResourceWritable();
rw.set(TestUtils.uri("y"));
List<Fact> facts = new LinkedList<>();
facts.add(X_SUB_Y);
facts.add(Y_SUB_Z);
ReduceDriver<ResourceWritable, Fact, Fact,
NullWritable> driver = new ReduceDriver<>();
driver.getConfiguration().setInt(MRReasoningUtils.STEP_PROP, 1);
driver.withReducer(new ForwardChain.ReasoningReducer(schema))
.withInput(rw, facts)
.withMultiOutput(MRReasoningUtils.INTERMEDIATE_OUT,
X_SUB_Z, NullWritable.get())
.runTest();
}
/**
* MultipleOutputs support is minimal, so we have to check each map/reduce
* step explicitly
*/
@Test
public void testTransitiveChain() throws Exception {
int max = 8;
int n = 4;
IRI prop = TestUtils.uri("subOrganizationOf");
Map<Integer, Map<Integer, Pair<Fact, NullWritable>>> connections
= new HashMap<>();
for (int i = 0; i <= max; i++) {
connections.put(i, new HashMap<Integer, Pair<Fact, NullWritable>>());
}
// Initial input: make a chain from org0 to org8
for (int i = 0; i < max; i++) {
IRI orgI = TestUtils.uri("org" + i);
IRI orgJ = TestUtils.uri("org" + (i + 1));
Fact triple = new Fact(orgI, prop, orgJ);
connections.get(i).put(i+1, new Pair<>(triple, NullWritable.get()));
}
for (int i = 1; i <= n; i++) {
// Map:
MapDriver<Fact, NullWritable, ResourceWritable,
Fact> mDriver = new MapDriver<>();
mDriver.getConfiguration().setInt(MRReasoningUtils.STEP_PROP, i);
mDriver.setMapper(new ForwardChain.FileMapper(schema));
for (int j : connections.keySet()) {
for (int k : connections.get(j).keySet()) {
mDriver.addInput(connections.get(j).get(k));
}
}
List<Pair<ResourceWritable, Fact>> mapped = mDriver.run();
// Convert data for reduce phase:
ReduceFeeder<ResourceWritable, Fact> feeder =
new ReduceFeeder<>(mDriver.getConfiguration());
List<KeyValueReuseList<ResourceWritable, Fact>> intermediate
= feeder.sortAndGroup(mapped,
new ResourceWritable.SecondaryComparator(),
new ResourceWritable.PrimaryComparator());
// Reduce, and compare to expected output:
ReduceDriver<ResourceWritable, Fact, Fact,
NullWritable> rDriver = new ReduceDriver<>();
rDriver.getConfiguration().setInt(MRReasoningUtils.STEP_PROP, i);
rDriver.setReducer(new ForwardChain.ReasoningReducer(schema));
rDriver.addAllElements(intermediate);
int maxSpan = (int) Math.pow(2, i);
int minSpan = (maxSpan/2) + 1;
// For each j, build all paths starting with j:
for (int j = 0; j < max; j++) {
// This includes any path of length k for appropriate k:
for (int k = minSpan; k <= maxSpan && j+k <= max; k++) {
int middle = j + minSpan - 1;
IRI left = TestUtils.uri("org" + j);
IRI right = TestUtils.uri("org" + (j + k));
Fact triple = new Fact(left, prop,
right, i, OwlRule.PRP_TRP, TestUtils.uri("org" + middle));
triple.addSource(connections.get(j).get(middle).getFirst());
triple.addSource(connections.get(middle).get(j+k).getFirst());
Pair<Fact, NullWritable> expected =
new Pair<>(triple, NullWritable.get());
connections.get(j).put(j+k, expected);
rDriver.addMultiOutput("intermediate", expected);
}
}
rDriver.runTest();
}
}
}