blob: d4034507fc800b83da2c293342fe7039ab1072e8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.rya.periodic.notification.pruner;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.xml.datatype.DatatypeFactory;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.Snapshot;
import org.apache.fluo.api.client.scanner.ColumnScanner;
import org.apache.fluo.api.client.scanner.RowScanner;
import org.apache.fluo.api.data.Bytes;
import org.apache.fluo.api.data.ColumnValue;
import org.apache.fluo.api.data.Span;
import org.apache.fluo.core.client.FluoClientImpl;
import org.apache.rya.api.resolver.RdfToRyaConversions;
import org.apache.rya.api.utils.CloseableIterator;
import org.apache.rya.indexing.pcj.fluo.api.CreatePeriodicQuery;
import org.apache.rya.indexing.pcj.fluo.api.InsertTriples;
import org.apache.rya.indexing.pcj.fluo.app.IncrementalUpdateConstants;
import org.apache.rya.indexing.pcj.fluo.app.NodeType;
import org.apache.rya.indexing.pcj.fluo.app.util.FluoQueryUtils;
import org.apache.rya.indexing.pcj.fluo.app.util.PeriodicQueryUtil;
import org.apache.rya.indexing.pcj.fluo.app.util.RowKeyUtil;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.PeriodicQueryStorageException;
import org.apache.rya.indexing.pcj.storage.accumulo.AccumuloPeriodicQueryResultStorage;
import org.apache.rya.indexing.pcj.storage.accumulo.VariableOrder;
import org.apache.rya.pcj.fluo.test.base.RyaExportITBase;
import org.apache.rya.periodic.notification.api.NodeBin;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.ValueFactory;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.model.vocabulary.XMLSchema;
import org.eclipse.rdf4j.query.BindingSet;
import org.eclipse.rdf4j.query.algebra.evaluation.QueryBindingSet;
import org.eclipse.rdf4j.query.impl.MapBindingSet;
import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.Sets;
public class PeriodicNotificationBinPrunerIT extends RyaExportITBase {
@Test
public void periodicPrunerTest() throws Exception {
String sparql = "prefix function: <http://org.apache.rya/function#> " // n
+ "prefix time: <http://www.w3.org/2006/time#> " // n
+ "select ?id (count(?obs) as ?total) where {" // n
+ "Filter(function:periodic(?time, 2, .5, time:hours)) " // n
+ "?obs <uri:hasTime> ?time. " // n
+ "?obs <uri:hasId> ?id } group by ?id"; // n
FluoClient fluo = new FluoClientImpl(super.getFluoConfiguration());
// initialize resources and create pcj
PeriodicQueryResultStorage periodicStorage = new AccumuloPeriodicQueryResultStorage(super.getAccumuloConnector(),
getRyaInstanceName());
CreatePeriodicQuery createPeriodicQuery = new CreatePeriodicQuery(fluo, periodicStorage);
String queryId = FluoQueryUtils.convertFluoQueryIdToPcjId(createPeriodicQuery.createPeriodicQuery(sparql).getQueryId());
// create statements to ingest into Fluo
final ValueFactory vf = SimpleValueFactory.getInstance();
final DatatypeFactory dtf = DatatypeFactory.newInstance();
ZonedDateTime time = ZonedDateTime.now();
long currentTime = time.toInstant().toEpochMilli();
ZonedDateTime zTime1 = time.minusMinutes(30);
String time1 = zTime1.format(DateTimeFormatter.ISO_INSTANT);
ZonedDateTime zTime2 = zTime1.minusMinutes(30);
String time2 = zTime2.format(DateTimeFormatter.ISO_INSTANT);
ZonedDateTime zTime3 = zTime2.minusMinutes(30);
String time3 = zTime3.format(DateTimeFormatter.ISO_INSTANT);
ZonedDateTime zTime4 = zTime3.minusMinutes(30);
String time4 = zTime4.format(DateTimeFormatter.ISO_INSTANT);
final Collection<Statement> statements = Sets.newHashSet(
vf.createStatement(vf.createIRI("urn:obs_1"), vf.createIRI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time1))),
vf.createStatement(vf.createIRI("urn:obs_1"), vf.createIRI("uri:hasId"), vf.createLiteral("id_1")),
vf.createStatement(vf.createIRI("urn:obs_2"), vf.createIRI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time2))),
vf.createStatement(vf.createIRI("urn:obs_2"), vf.createIRI("uri:hasId"), vf.createLiteral("id_2")),
vf.createStatement(vf.createIRI("urn:obs_3"), vf.createIRI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
vf.createStatement(vf.createIRI("urn:obs_3"), vf.createIRI("uri:hasId"), vf.createLiteral("id_3")),
vf.createStatement(vf.createIRI("urn:obs_4"), vf.createIRI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
vf.createStatement(vf.createIRI("urn:obs_4"), vf.createIRI("uri:hasId"), vf.createLiteral("id_4")),
vf.createStatement(vf.createIRI("urn:obs_1"), vf.createIRI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time4))),
vf.createStatement(vf.createIRI("urn:obs_1"), vf.createIRI("uri:hasId"), vf.createLiteral("id_1")),
vf.createStatement(vf.createIRI("urn:obs_2"), vf.createIRI("uri:hasTime"),
vf.createLiteral(dtf.newXMLGregorianCalendar(time3))),
vf.createStatement(vf.createIRI("urn:obs_2"), vf.createIRI("uri:hasId"), vf.createLiteral("id_2")));
// add statements to Fluo
InsertTriples inserter = new InsertTriples();
statements.forEach(x -> inserter.insert(fluo, RdfToRyaConversions.convertStatement(x)));
super.getMiniFluo().waitForObservers();
// FluoITHelper.printFluoTable(fluo);
// Create the expected results of the SPARQL query once the PCJ has been
// computed.
final Set<BindingSet> expected1 = new HashSet<>();
final Set<BindingSet> expected2 = new HashSet<>();
final Set<BindingSet> expected3 = new HashSet<>();
final Set<BindingSet> expected4 = new HashSet<>();
long period = 1800000;
long binId = (currentTime / period) * period;
long bin1 = binId;
long bin2 = binId + period;
long bin3 = binId + 2 * period;
long bin4 = binId + 3 * period;
MapBindingSet bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin1));
expected1.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin1));
expected1.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin1));
expected1.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_4", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin1));
expected1.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin2));
expected2.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("2", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin2));
expected2.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_3", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin2));
expected2.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin3));
expected3.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_2", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin3));
expected3.add(bs);
bs = new MapBindingSet();
bs.addBinding("total", vf.createLiteral("1", XMLSchema.INTEGER));
bs.addBinding("id", vf.createLiteral("id_1", XMLSchema.STRING));
bs.addBinding("periodicBinId", vf.createLiteral(bin4));
expected4.add(bs);
// make sure that expected and actual results align after ingest
compareResults(periodicStorage, queryId, bin1, expected1);
compareResults(periodicStorage, queryId, bin2, expected2);
compareResults(periodicStorage, queryId, bin3, expected3);
compareResults(periodicStorage, queryId, bin4, expected4);
BlockingQueue<NodeBin> bins = new LinkedBlockingQueue<>();
PeriodicQueryPrunerExecutor pruner = new PeriodicQueryPrunerExecutor(periodicStorage, fluo, 1, bins);
pruner.start();
bins.add(new NodeBin(queryId, bin1));
bins.add(new NodeBin(queryId, bin2));
bins.add(new NodeBin(queryId, bin3));
bins.add(new NodeBin(queryId, bin4));
Thread.sleep(10000);
compareResults(periodicStorage, queryId, bin1, new HashSet<>());
compareResults(periodicStorage, queryId, bin2, new HashSet<>());
compareResults(periodicStorage, queryId, bin3, new HashSet<>());
compareResults(periodicStorage, queryId, bin4, new HashSet<>());
compareFluoCounts(fluo, queryId, bin1);
compareFluoCounts(fluo, queryId, bin2);
compareFluoCounts(fluo, queryId, bin3);
compareFluoCounts(fluo, queryId, bin4);
pruner.stop();
}
private void compareResults(PeriodicQueryResultStorage periodicStorage, String queryId, long bin, Set<BindingSet> expected) throws PeriodicQueryStorageException, Exception {
try(CloseableIterator<BindingSet> iter = periodicStorage.listResults(queryId, Optional.of(bin))) {
Set<BindingSet> actual = new HashSet<>();
while(iter.hasNext()) {
actual.add(iter.next());
}
Assert.assertEquals(expected, actual);
}
}
private void compareFluoCounts(FluoClient client, String pcjId, long bin) {
final ValueFactory vf = SimpleValueFactory.getInstance();
QueryBindingSet bs = new QueryBindingSet();
bs.addBinding(IncrementalUpdateConstants.PERIODIC_BIN_ID, vf.createLiteral(Long.toString(bin), XMLSchema.LONG));
VariableOrder varOrder = new VariableOrder(IncrementalUpdateConstants.PERIODIC_BIN_ID);
try(Snapshot sx = client.newSnapshot()) {
String fluoQueryId = NodeType.generateNewIdForType(NodeType.QUERY, pcjId);
Set<String> ids = new HashSet<>();
PeriodicQueryUtil.getPeriodicQueryNodeAncestorIds(sx, fluoQueryId, ids);
for(String id: ids) {
NodeType optNode = NodeType.fromNodeId(id).orNull();
if(optNode == null) throw new RuntimeException("Invalid NodeType.");
Bytes prefix = RowKeyUtil.makeRowKey(id,varOrder, bs);
RowScanner scanner = sx.scanner().fetch(optNode.getResultColumn()).over(Span.prefix(prefix)).byRow().build();
int count = 0;
Iterator<ColumnScanner> colScannerIter = scanner.iterator();
while(colScannerIter.hasNext()) {
ColumnScanner colScanner = colScannerIter.next();
String row = colScanner.getRow().toString();
Iterator<ColumnValue> values = colScanner.iterator();
while(values.hasNext()) {
values.next();
count++;
}
}
Assert.assertEquals(0, count);
}
}
}
}