blob: 2ac9f4589d63577be0c4a65c379fd3ffd7035272 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.functions.geo;
import static java.util.Objects.requireNonNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import javax.xml.datatype.DatatypeFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.rya.api.client.RyaClient;
import org.apache.rya.api.client.accumulo.AccumuloConnectionDetails;
import org.apache.rya.api.client.accumulo.AccumuloRyaClientFactory;
import org.apache.rya.indexing.pcj.storage.PrecomputedJoinStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPcjStorage;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.eclipse.rdf4j.model.IRI;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.algebra.evaluation.ValueExprEvaluationException;
import org.eclipse.rdf4j.query.algebra.evaluation.function.Function;
import org.eclipse.rdf4j.query.algebra.evaluation.function.FunctionRegistry;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.eclipse.rdf4j.repository.sail.SailRepositoryConnection;
import org.junit.Test;
import com.google.common.collect.Sets;
/**
* Performs integration tests PCJ Geospatial functions in SPARQL.
* Each test starts a Accumulo/Rya/Fluo single node stack and runs a continuous query, checking results.
*/
public class GeoFunctionsIT extends RyaExportITBase {
@Test
public void verifySpiLoadedGeoFunctions() {
final String functions[] = { "distance", "convexHull", "boundary", "envelope", "union", "intersection",
"symDifference", "difference", "relate", "sfDisjoint", "sfIntersects", "sfTouches", "sfCrosses",
"sfWithin", "sfContains", "sfOverlaps", "ehDisjoint", "ehMeet", "ehOverlap", "ehCovers", "ehCoveredBy",
"ehInside", "ehContains", "rcc8dc", "rcc8ec", "rcc8po", "rcc8tppi", "rcc8tpp", "rcc8ntpp", "rcc8ntppi" };
final HashSet<String> functionsCheckList = new HashSet<>();
functionsCheckList.addAll(Arrays.asList(functions));
for (final String f : FunctionRegistry.getInstance().getKeys()) {
final String functionShortName = f.replaceFirst("^.*/geosparql/(.*)", "$1");
functionsCheckList.remove(functionShortName);
}
assertTrue("Missed loading these functions via SPI: " + functionsCheckList, functionsCheckList.isEmpty());
}
@Test
public void withGeoFilters() throws Exception {
final String sparql =
"PREFIX geo: <http://www.opengis.net/ont/geosparql#> " +
"PREFIX ryageo: <tag:rya.apache.org,2017:function/geo#> " +
"PREFIX geof: <http://www.opengis.net/def/function/geosparql/> " +
"SELECT ?feature ?point ?wkt {" +
" ?feature a geo:Feature . " +
" ?feature geo:hasGeometry ?point . " +
" ?point a geo:Point . " +
" ?point geo:asWKT ?wkt . " +
" FILTER(ryageo:ehContains(?wkt, \"POLYGON((-77 39, -76 39, -76 38, -77 38, -77 39))\"^^geo:wktLiteral)) " +
"}";
final ValueFactory vf = SimpleValueFactory.getInstance();
final Set<Statement> statements = Sets.newHashSet(
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#feature"), vf.createIRI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), vf.createIRI("http://www.opengis.net/ont/geosparql#Feature")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#feature"), vf.createIRI("http://www.opengis.net/ont/geosparql#hasGeometry"), vf.createIRI("tag:rya.apache.org,2017:ex#test_point")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#test_point"), vf.createIRI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), vf.createIRI("http://www.opengis.net/ont/geosparql#Point")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#test_point"), vf.createIRI("http://www.opengis.net/ont/geosparql#asWKT"), vf.createLiteral("Point(-77.03524 38.889468)", vf.createIRI("http://www.opengis.net/ont/geosparql#wktLiteral"))));
// Create a Geo function.
final Function geoFunction = new Function() {
@Override
public String getURI() {
return "tag:rya.apache.org,2017:function/geo#ehContains";
}
@Override
public Value evaluate(final ValueFactory valueFactory, final Value... args) throws ValueExprEvaluationException {
if (args.length != 2) {
throw new ValueExprEvaluationException(getURI() + " requires exactly 3 arguments, got " + args.length);
}
return valueFactory.createLiteral(true);
}
};
// Add our new function to the registry
FunctionRegistry.getInstance().add(geoFunction);
// The expected results of the SPARQL query once the PCJ has been computed.
final Set<BindingSet> expectedResults = new HashSet<>();
final MapBindingSet bs = new MapBindingSet();
bs.addBinding("wkt", vf.createLiteral("Point(-77.03524 38.889468)", vf.createIRI("http://www.opengis.net/ont/geosparql#wktLiteral")));
bs.addBinding("feature", vf.createIRI("tag:rya.apache.org,2017:ex#feature"));
bs.addBinding("point", vf.createIRI("tag:rya.apache.org,2017:ex#test_point"));
expectedResults.add(bs);
runTest(sparql, statements, expectedResults);
}
@Test
public void GeoDistance() throws Exception {
final String sparql =
"PREFIX geo: <http://www.opengis.net/ont/geosparql#> " +
"PREFIX geof: <http://www.opengis.net/def/function/geosparql/> " +
"PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " +
"SELECT ?cityA ?cityB " +
"WHERE { " +
"?cityA <urn:containedIn> ?continent. " +
"?cityB <urn:containedIn> ?continent. " +
"?cityA geo:asWKT ?coord1 . " +
"?cityB geo:asWKT ?coord2 . " +
// from brussels 173km to amsterdam
" FILTER ( 500000 > geof:distance(?coord1, ?coord2, uom:metre) ) . " +
" FILTER ( !sameTerm (?cityA, ?cityB) ) " +
"}";
final ValueFactory vf = SimpleValueFactory.getInstance();
final IRI wktTypeUri = vf.createIRI("http://www.opengis.net/ont/geosparql#wktLiteral");
final IRI asWKT = vf.createIRI("http://www.opengis.net/ont/geosparql#asWKT");
final Set<Statement> statements = Sets.newHashSet(
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#dakar"), asWKT, vf.createLiteral("Point(-17.45 14.69)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#dakar2"), asWKT, vf.createLiteral("Point(-17.45 14.69)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createIRI("urn:containedIn"), vf.createLiteral("Europe")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#dakar"), vf.createIRI("urn:containedIn"), vf.createLiteral("Africa")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#dakar2"), vf.createIRI("urn:containedIn"), vf.createLiteral("Africa")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#brussels"), vf.createIRI("urn:containedIn"), vf.createLiteral("Europe")));
// The expected results of the SPARQL query once the PCJ has been computed.
final Set<BindingSet> expectedResults = new HashSet<>();
MapBindingSet bs = new MapBindingSet();
bs.addBinding("cityA", vf.createIRI("tag:rya.apache.org,2017:ex#dakar"));
bs.addBinding("cityB", vf.createIRI("tag:rya.apache.org,2017:ex#dakar2"));
expectedResults.add(bs);
bs = new MapBindingSet();
bs.addBinding("cityA", vf.createIRI("tag:rya.apache.org,2017:ex#dakar2"));
bs.addBinding("cityB", vf.createIRI("tag:rya.apache.org,2017:ex#dakar"));
expectedResults.add(bs);
bs = new MapBindingSet();
bs.addBinding("cityA", vf.createIRI("tag:rya.apache.org,2017:ex#brussels"));
bs.addBinding("cityB", vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam"));
expectedResults.add(bs);
bs = new MapBindingSet();
bs.addBinding("cityA", vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam"));
bs.addBinding("cityB", vf.createIRI("tag:rya.apache.org,2017:ex#brussels"));
expectedResults.add(bs);
runTest(sparql, statements, expectedResults);
}
/**
* sfwithin function test. This requires full blown JTS.
* If you see this error: "Unknown Shape definition [POLYGON" ...
* Then:
* (from Solr docs:) the field definition needs the attribute
* spatialContextFactory="com.spatial4j.core.context.jts.JtsSpatialContextFactory"
* or (this works) this system property must be set to :
* SpatialContextFactory=com.spatial4j.core.context.jts.JtsSpatialContextFactory
* If you see:
* java.lang.UnsupportedOperationException: Not supported due to licensing issues. Feel free to provide your own implementation by using something like JTS.
* Then add a bit of code to replace the default one that comes with RDF4J:
* SpatialSupportInitializer.java
* Here is one: https://bitbucket.org/pulquero/sesame-geosparql-jts
*/
@Test
public void withGeoSpatialSupportInitializer() throws Exception {
final String sparql =
"PREFIX geo: <http://www.opengis.net/ont/geosparql#> " +
"PREFIX ryageo: <tag:rya.apache.org,2017:function/geo#> " +
"PREFIX geof: <http://www.opengis.net/def/function/geosparql/> " +
"SELECT ?feature ?point ?wkt { " +
"?feature a geo:Feature . " +
"?feature geo:hasGeometry ?point . " +
"?point a geo:Point . " +
"?point geo:asWKT ?wkt . " +
"FILTER(geof:sfWithin(?wkt, \"POLYGON((-78 39, -76 39, -76 38, -78 38, -78 39))\"^^geo:wktLiteral)) " +
"}";
final ValueFactory vf = SimpleValueFactory.getInstance();
final Set<Statement> statements = Sets.newHashSet(
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#feature"), vf.createIRI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), vf.createIRI("http://www.opengis.net/ont/geosparql#Feature")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#feature"), vf.createIRI("http://www.opengis.net/ont/geosparql#hasGeometry"), vf.createIRI("tag:rya.apache.org,2017:ex#test_point")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#test_point"), vf.createIRI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), vf.createIRI("http://www.opengis.net/ont/geosparql#Point")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#test_point"), vf.createIRI("http://www.opengis.net/ont/geosparql#asWKT"), vf.createLiteral("Point(-77.03524 38.889468)", vf.createIRI("http://www.opengis.net/ont/geosparql#wktLiteral"))),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#skip_point"), vf.createIRI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), vf.createIRI("http://www.opengis.net/ont/geosparql#Point")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#skip_point"), vf.createIRI("http://www.opengis.net/ont/geosparql#asWKT"), vf.createLiteral("Point(-10 10)", vf.createIRI("http://www.opengis.net/ont/geosparql#wktLiteral"))));
// Register geo functions from RDF4J is done automatically via SPI.
// The expected results of the SPARQL query once the PCJ has been computed.
final Set<BindingSet> expectedResults = new HashSet<>();
final MapBindingSet bs = new MapBindingSet();
bs.addBinding("wkt", vf.createLiteral("Point(-77.03524 38.889468)", vf.createIRI("http://www.opengis.net/ont/geosparql#wktLiteral")));
bs.addBinding("feature", vf.createIRI("tag:rya.apache.org,2017:ex#feature"));
bs.addBinding("point", vf.createIRI("tag:rya.apache.org,2017:ex#test_point"));
expectedResults.add(bs);
runTest(sparql, statements, expectedResults);
}
/**
* This test does not rely on geoTools. The default implementation in RDF4J handles point intersections.
*/
@Test
public void withGeoIntersectsPoint() throws Exception {
final String sparql =
"PREFIX geo: <http://www.opengis.net/ont/geosparql#> " +
"PREFIX geof: <http://www.opengis.net/def/function/geosparql/> " +
"PREFIX uom: <http://www.opengis.net/def/uom/OGC/1.0/> " +
"SELECT ?cityA ?cityB { " +
"?cityA <urn:containedIn> ?continent. " +
"?cityB <urn:containedIn> ?continent. " +
"?cityA geo:asWKT ?coord1 . " +
"?cityB geo:asWKT ?coord2 . " +
" FILTER ( geof:sfIntersects(?coord1, ?coord2) ) " +
" FILTER ( !sameTerm (?cityA, ?cityB) ) " +
"}";
final ValueFactory vf = SimpleValueFactory.getInstance();
final IRI wktTypeUri = vf.createIRI("http://www.opengis.net/ont/geosparql#wktLiteral");
final IRI asWKT = vf.createIRI("http://www.opengis.net/ont/geosparql#asWKT");
final Set<Statement> statements = Sets.newHashSet(
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#dakar"), asWKT, vf.createLiteral("Point(-17.45 14.69)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#canberra"), asWKT, vf.createLiteral("Point(149.12 -35.31)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#brussels"), asWKT, vf.createLiteral("Point(4.35 50.85)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam"), vf.createIRI("urn:containedIn"), vf.createLiteral("Europe")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam2"), vf.createIRI("urn:containedIn"), vf.createLiteral("Europe")),
vf.createStatement(vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam2"), asWKT, vf.createLiteral("Point(4.9 52.37)", wktTypeUri)));
// The expected results of the SPARQL query once the PCJ has been computed.
final Set<BindingSet> expectedResults = new HashSet<>();
MapBindingSet bs = new MapBindingSet();
bs.addBinding("cityA", vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam"));
bs.addBinding("cityB", vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam2"));
expectedResults.add(bs);
bs = new MapBindingSet();
bs.addBinding("cityA", vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam2"));
bs.addBinding("cityB", vf.createIRI("tag:rya.apache.org,2017:ex#amsterdam"));
expectedResults.add(bs);
runTest(sparql, statements, expectedResults);
}
@Test
public void withTemporal() throws Exception {
// Find all stored dates.
final String sparql =
"PREFIX time: <http://www.w3.org/2006/time#> " +
"PREFIX xml: <http://www.w3.org/2001/XMLSchema#> " +
"PREFIX tempo: <tag:rya-rdf.org,2015:temporal#> " +
"SELECT ?event ?time { " +
"?event time:inXSDDateTime ?time . " +
"FILTER(?time > '2001-01-01T01:01:03-08:00'^^xml:dateTime) " + // after 3 seconds
"FILTER('2007-01-01T01:01:01+09:00'^^xml:dateTime > ?time ) " + // 2006/12/31 include 2006, not 2007,8
"}";
// create some resources and literals to make statements out of
final ValueFactory vf = SimpleValueFactory.getInstance();
final DatatypeFactory dtf = DatatypeFactory.newInstance();
final IRI dtPredUri = vf.createIRI("http://www.w3.org/2006/time#inXSDDateTime");
final IRI eventz = vf.createIRI("<http://eventz>");
final Set<Statement> statements = Sets.newHashSet(
vf.createStatement(eventz, vf.createIRI("http://www.w3.org/1999/02/22-rdf-syntax-ns#type"), vf.createIRI("<http://www.w3.org/2006/time#Instant>")),
vf.createStatement(eventz, dtPredUri, vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:01-08:00"))), // 1 second
vf.createStatement(eventz, dtPredUri, vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T04:01:02.000-05:00"))), // 2 seconds
vf.createStatement(eventz, dtPredUri, vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:03-08:00"))), // 3 seconds
vf.createStatement(eventz, dtPredUri, vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T01:01:03.999-08:00"))), // 4 seconds
vf.createStatement(eventz, dtPredUri, vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05Z"))), // 5 seconds
vf.createStatement(eventz, dtPredUri, vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z"))),
vf.createStatement(eventz, dtPredUri, vf.createLiteral(dtf.newXMLGregorianCalendar("2007-01-01T05:00:00.000Z"))),
vf.createStatement(eventz, dtPredUri, vf.createLiteral(dtf.newXMLGregorianCalendar("2008-01-01T05:00:00.000Z"))));
final Set<BindingSet> expectedResults = new HashSet<>();
MapBindingSet bs = new MapBindingSet();
bs.addBinding("time", vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:05.000Z")));
bs.addBinding("event", eventz);
expectedResults.add(bs);
bs = new MapBindingSet();
bs.addBinding("time", vf.createLiteral(dtf.newXMLGregorianCalendar("2006-01-01T05:00:00.000Z")));
bs.addBinding("event", eventz);
expectedResults.add(bs);
bs = new MapBindingSet();
bs.addBinding("time", vf.createLiteral(dtf.newXMLGregorianCalendar("2001-01-01T09:01:03.999Z")));
bs.addBinding("event", eventz);
expectedResults.add(bs);
runTest(sparql, statements, expectedResults);
}
public void runTest(final String sparql, final Collection<Statement> statements, final Collection<BindingSet> expectedResults) throws Exception {
requireNonNull(sparql);
requireNonNull(statements);
requireNonNull(expectedResults);
// Register the PCJ with Rya.
final Instance accInstance = super.getAccumuloConnector().getInstance();
final Connector accumuloConn = super.getAccumuloConnector();
final RyaClient ryaClient = AccumuloRyaClientFactory.build(new AccumuloConnectionDetails(
getUsername(),
getPassword().toCharArray(),
accInstance.getInstanceName(),
accInstance.getZooKeepers()), accumuloConn);
ryaClient.getCreatePCJ().createPCJ(getRyaInstanceName(), sparql);
// Write the data to Rya.
final SailRepositoryConnection ryaConn = super.getRyaSailRepository().getConnection();
ryaConn.begin();
ryaConn.add(statements);
ryaConn.commit();
ryaConn.close();
// Wait for the Fluo application to finish computing the end result.
super.getMiniFluo().waitForObservers();
// Fetch the value that is stored within the PCJ table.
try(final PrecomputedJoinStorage pcjStorage = new AccumuloPcjStorage(accumuloConn, getRyaInstanceName())) {
final String pcjId = pcjStorage.listPcjs().get(0);
final Set<BindingSet> results = Sets.newHashSet( pcjStorage.listResults(pcjId) );
// Ensure the result of the query matches the expected result.
assertEquals(expectedResults, results);
}
}
}