blob: 3482e0dc554ebb4880a9bc7d4b844bdc6db5eef9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.indexing.pcj.fluo.app.query;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeRelocator;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil.PeriodicQueryNodeVisitor;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.query.MalformedQueryException;
import org.eclipse.rdf4j.query.algebra.Filter;
import org.eclipse.rdf4j.query.algebra.FunctionCall;
import org.eclipse.rdf4j.query.algebra.Join;
import org.eclipse.rdf4j.query.algebra.Projection;
import org.eclipse.rdf4j.query.algebra.QueryModelNode;
import org.eclipse.rdf4j.query.algebra.TupleExpr;
import org.eclipse.rdf4j.query.algebra.ValueConstant;
import org.eclipse.rdf4j.query.algebra.ValueExpr;
import org.eclipse.rdf4j.query.algebra.Var;
import org.eclipse.rdf4j.query.algebra.helpers.AbstractQueryModelVisitor;
import org.eclipse.rdf4j.query.parser.ParsedQuery;
import org.eclipse.rdf4j.query.parser.sparql.SPARQLParser;
import org.junit.Assert;
import org.junit.Test;
public class PeriodicQueryUtilTest {
private static final ValueFactory VF = SimpleValueFactory.getInstance();
@Test
public void periodicNodeNotPresentTest() throws Exception {
List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(VF.createLiteral(12.0)), new ValueConstant(VF.createLiteral(6.0)), new ValueConstant(VF.createIRI(PeriodicQueryUtil.temporalNameSpace + "hours")));
FunctionCall func = new FunctionCall("uri:func", values);
Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
Assert.assertEquals(false, node1.isPresent());
}
@Test
public void periodicNodePresentTest() throws Exception {
List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(VF.createLiteral(12.0)), new ValueConstant(VF.createLiteral(6.0)), new ValueConstant(VF.createIRI(PeriodicQueryUtil.temporalNameSpace + "hours")));
FunctionCall func = new FunctionCall(PeriodicQueryUtil.PeriodicQueryURI, values);
Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
Assert.assertEquals(true, node1.isPresent());
PeriodicQueryNode node2 = new PeriodicQueryNode(12*60*60*1000L, 6*3600*1000L, TimeUnit.MILLISECONDS, "time", new Join());
Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node1.get(), node2));
}
@Test
public void periodicNodeFractionalDurationTest() throws Exception {
List<ValueExpr> values = Arrays.asList(new Var("time"), new ValueConstant(VF.createLiteral(1)), new ValueConstant(VF.createLiteral(.5)), new ValueConstant(VF.createIRI(PeriodicQueryUtil.temporalNameSpace + "hours")));
FunctionCall func = new FunctionCall(PeriodicQueryUtil.PeriodicQueryURI, values);
Optional<PeriodicQueryNode> node1 = PeriodicQueryUtil.getPeriodicQueryNode(func, new Join());
Assert.assertEquals(true, node1.isPresent());
double window = 1*60*60*1000;
double period = .5*3600*1000;
PeriodicQueryNode node2 = new PeriodicQueryNode((long) window, (long) period, TimeUnit.MILLISECONDS, "time", new Join());
Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node1.get(), node2));
}
@Test
public void testPeriodicNodePlacement() throws MalformedQueryException {
String query = "prefix function: <http://org.apache.rya/function#> " //n
+ "prefix time: <http://www.w3.org/2006/time#> " //n
+ "prefix fn: <http://www.w3.org/2006/fn#> " //n
+ "select ?obs ?time ?lat where {" //n
+ "Filter(function:periodic(?time, 12.0, 6.0,time:hours)) " //n
+ "Filter(fn:test(?lat, 25)) " //n
+ "?obs <uri:hasTime> ?time. " //n
+ "?obs <uri:hasLattitude> ?lat }"; //n
SPARQLParser parser = new SPARQLParser();
ParsedQuery pq = parser.parseQuery(query, null);
TupleExpr te = pq.getTupleExpr();
te.visit(new PeriodicQueryNodeVisitor());
PeriodicNodeCollector collector = new PeriodicNodeCollector();
te.visit(collector);
PeriodicQueryNode node2 = new PeriodicQueryNode(12*60*60*1000L, 6*3600*1000L, TimeUnit.MILLISECONDS, "time", new Join());
Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node2, collector.getPeriodicQueryNode()));
}
@Test
public void testPeriodicNodeLocation() throws MalformedQueryException {
String query = "prefix function: <http://org.apache.rya/function#> " //n
+ "prefix time: <http://www.w3.org/2006/time#> " //n
+ "prefix fn: <http://www.w3.org/2006/fn#> " //n
+ "select ?obs ?time ?lat where {" //n
+ "Filter(function:periodic(?time, 1,.5,time:hours)) " //n
+ "Filter(fn:test(?lat, 25)) " //n
+ "?obs <uri:hasTime> ?time. " //n
+ "?obs <uri:hasLattitude> ?lat }"; //n
SPARQLParser parser = new SPARQLParser();
ParsedQuery pq = parser.parseQuery(query, null);
TupleExpr te = pq.getTupleExpr();
te.visit(new PeriodicQueryNodeVisitor());
PeriodicNodeCollector collector = new PeriodicNodeCollector();
te.visit(collector);
Assert.assertEquals(2, collector.getPos());
te.visit(new PeriodicQueryNodeRelocator());
collector.resetCount();
te.visit(collector);
Assert.assertEquals(1, collector.getPos());
double window = 1*60*60*1000;
double period = .5*3600*1000;
PeriodicQueryNode node2 = new PeriodicQueryNode((long) window, (long) period, TimeUnit.MILLISECONDS, "time", new Join());
Assert.assertEquals(true, periodicNodesEqualIgnoreArg(node2, collector.getPeriodicQueryNode()));
}
@Test
public void testFluoQueryVarOrders() throws MalformedQueryException, UnsupportedQueryException {
String query = "prefix function: <http://org.apache.rya/function#> " //n
+ "prefix time: <http://www.w3.org/2006/time#> " //n
+ "select (count(?obs) as ?total) where {" //n
+ "Filter(function:periodic(?time, 12.4, 6.2,time:hours)) " //n
+ "?obs <uri:hasTime> ?time. " //n
+ "?obs <uri:hasLattitude> ?lat }"; //n
SparqlFluoQueryBuilder builder = new SparqlFluoQueryBuilder();
builder.setSparql(query);
builder.setFluoQueryId(NodeType.generateNewFluoIdForType(NodeType.QUERY));
FluoQuery fluoQuery = builder.build();
PeriodicQueryMetadata periodicMeta = fluoQuery.getPeriodicQueryMetadata().orNull();
Assert.assertEquals(true, periodicMeta != null);
VariableOrder periodicVars = periodicMeta.getVariableOrder();
Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, periodicVars.getVariableOrders().get(0));
QueryMetadata queryMeta = fluoQuery.getQueryMetadata();
VariableOrder queryVars = queryMeta.getVariableOrder();
Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, queryVars.getVariableOrders().get(0));
Collection<AggregationMetadata> aggMetaCollection = fluoQuery.getAggregationMetadata();
Assert.assertEquals(1, aggMetaCollection.size());
AggregationMetadata aggMeta = aggMetaCollection.iterator().next();
VariableOrder aggVars = aggMeta.getVariableOrder();
Assert.assertEquals(IncrementalUpdateConstants.PERIODIC_BIN_ID, aggVars.getVariableOrders().get(0));
System.out.println(fluoQuery);
}
private boolean periodicNodesEqualIgnoreArg(PeriodicQueryNode node1, PeriodicQueryNode node2) {
return new EqualsBuilder().append(node1.getPeriod(), node2.getPeriod()).append(node1.getWindowSize(), node2.getWindowSize())
.append(node1.getTemporalVariable(), node2.getTemporalVariable()).append(node1.getUnit(), node2.getUnit()).build();
}
private static class PeriodicNodeCollector extends AbstractQueryModelVisitor<RuntimeException> {
private PeriodicQueryNode periodicNode;
int count = 0;
public PeriodicQueryNode getPeriodicQueryNode() {
return periodicNode;
}
public int getPos() {
return count;
}
public void resetCount() {
count = 0;
}
public void meet(Filter node) {
count++;
node.getArg().visit(this);
}
public void meet(Projection node) {
count++;
node.getArg().visit(this);
}
@Override
public void meetOther(QueryModelNode node) {
if(node instanceof PeriodicQueryNode) {
periodicNode = (PeriodicQueryNode) node;
}
}
}
}